一、概述
在有 CPU
和 GPU
参与的一种运算中,比如深度学习推理,CPU 需要预处理数据,然后交给 GPU 处理,最后 CPU 对 GPU 的运算结果进行后处理。
在整个过程中都是 FIFO
,即数据 ABC 按顺序输入,也需要按 A’B’C’ 顺序输出。
如果采用同步阻塞的方式,在 CPU 预处理时 GPU 处于空闲状态,GPU 运算时 CPU 后处理处于空闲状态并且也不能进行后续数据的预处理。这样影响整体的吞吐。
期望是 GPU 运算时,CPU 可以同时进行数据预处理和后处理。这是典型的单生产者单消费者模式。
在两个线程之间传递数据时,为确保线程安全,可以在一个线程每次 malloc
或 new
申请内存,在另一个线程 free
或 delete
。为了避免频繁的内存分配和释放,需要使用到内存池。
本文描述采用有界队列实现内存池,适用场景和限制:
- 需要把内存使用控制在一定范围内。
- 整个过程不允许丢弃数据。
- 单生产者和单消费者。即不会(也不允许)同时生产,不会(也不允许)同时消费。如果确实要多线程生产或多线程消费,本代码并不适用。
- 生产和消费之间线程安全。
二、实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| #pragma once
#include <memory> #include <mutex> #include <string> #include <vector>
namespace tubumu {
class BoundedBuffer { public: BoundedBuffer(std::string name, size_t buffers_capacity, size_t buffer_size_max); BoundedBuffer(const BoundedBuffer&) = delete; BoundedBuffer& operator=(const BoundedBuffer&) = delete; BoundedBuffer(BoundedBuffer&&) = delete; BoundedBuffer& operator=(BoundedBuffer&&) = delete;
public:
void Produce(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func);
void Consume(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func);
private: const std::string _name;
std::vector<std::unique_ptr<uint8_t[]>> _buffers; size_t _buffers_capacity; size_t _buffer_size_max; std::mutex _buffers_mtx; std::condition_variable _buffers_not_full_cond; std::condition_variable _buffers_not_empty_cond; size_t _buffers_read_position; size_t _buffers_write_position; };
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include "bounded_buffer.h" #include <cassert> #include <memory> #include <mutex>
namespace tubumu {
BoundedBuffer::BoundedBuffer(std::string name, size_t buffers_capacity, size_t buffer_size_max) : _name(std::move(name)), _buffers_capacity(buffers_capacity), _buffer_size_max(buffer_size_max), _buffers_read_position(0), _buffers_write_position(0), _buffers_mtx(), _buffers_not_full_cond(), _buffers_not_empty_cond(), _buffers(buffers_capacity) { assert(buffers_capacity > 1); assert(buffer_size_max > 0); for (auto& buffer : _buffers) { buffer = std::make_unique<uint8_t[]>(buffer_size_max); } }
void BoundedBuffer::Produce(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func) { std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); _buffers_not_full_cond.wait(buffers_lock, [&] { return (_buffers_write_position + 1) % _buffers_capacity != _buffers_read_position; }); buffers_lock.unlock(); auto& buffer = _buffers[_buffers_write_position]; func(buffer); buffers_lock.lock(); _buffers_write_position = (_buffers_write_position + 1) % _buffers_capacity; buffers_lock.unlock(); _buffers_not_empty_cond.notify_one(); }
void BoundedBuffer::Consume(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func) { std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); _buffers_not_empty_cond.wait( buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; }); buffers_lock.unlock(); auto& buffer = _buffers[_buffers_read_position]; func(buffer); buffers_lock.lock(); _buffers_read_position = (_buffers_read_position + 1) % _buffers_capacity; buffers_lock.unlock(); _buffers_not_full_cond.notify_one(); }
}
|
三、测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #include "bounded_buffer.h" #include <iostream> #include <thread>
int main(int argc, const char** argv) { std::unique_ptr<tubumu::BoundedBuffer> boundedBuffer = std::make_unique<tubumu::BoundedBuffer>("Test", 4, sizeof(size_t));
std::thread producer_thread( [&] { for (size_t i = 0; i < 1000; i++) { boundedBuffer->Produce([=](std::unique_ptr<uint8_t[]>& buffer){ std::memcpy(buffer.get(), &i, sizeof(size_t)); std::cout << "Produce: " << i << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); } });
std::thread consumer_thread( [&] { for (size_t i = 0; i < 1000; i++) { boundedBuffer->Consume([=](std::unique_ptr<uint8_t[]>& buffer){ size_t value; std::memcpy(&value, buffer.get(), sizeof(size_t)); std::cout << "Consume: " << value << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); } });
producer_thread.join(); consumer_thread.join();
return 0; }
|
运行:
1 2
| $ time ./test_bounded_queue ./test_bounded_queue 0.05s user 0.05s system 0% cpu 24.314 total
|
理所应当地,粗略测试耗时 24s 左右比串行 40s 左右快——这不是重点,重点是达到了内存复用的目的。
四、说明
1、的确是需要 mutex 和 condition_variable 吗?
是的。比如在生产时,发现“无法获取到”可写的 slot,又不允许丢弃数据,为了不让生产者线程轮询则只能等待。
2、为什么 Produce 和 Consume 里 wait 返回后马上解锁?
比如生产时,生产的过程可能耗时。确保“能获取到”生产 slot 后立即解锁,以便消费者线程调用 Consume 时如果阻塞在 std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
能够取得锁,从而得以消费在本次生产之前已经生产好的 slot ——如果队列完全没有可读数据当然就“转为”阻塞在 _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });
。如果是阻塞在 wait
,则会在本次生产好后通过 _buffers_not_empty_cond.notify_one();
唤醒消费者线程。