Alby's blog

世上没有巧合,只有巧合的假象。

0%

一种有界队列(Bounded Buffer)的实现

一、概述

在有 CPUGPU 参与的一种运算中,比如深度学习推理,CPU 需要预处理数据,然后交给 GPU 处理,最后 CPU 对 GPU 的运算结果进行后处理
在整个过程中都是 FIFO,即数据 ABC 按顺序输入,也需要按 A’B’C’ 顺序输出。
如果采用同步阻塞的方式,在 CPU 预处理时 GPU 处于空闲状态,GPU 运算时 CPU 后处理处于空闲状态并且也不能进行后续数据的预处理。这样影响整体的吞吐。
期望是 GPU 运算时,CPU 可以同时进行数据预处理和后处理。这是典型的单生产者单消费者模式。

在两个线程之间传递数据时,为确保线程安全,可以在一个线程每次 mallocnew 申请内存,在另一个线程 freedelete。为了避免频繁的内存分配和释放,需要使用到内存池。

本文描述采用有界队列实现内存池,适用场景和限制:

  1. 需要把内存使用控制在一定范围内。
  2. 整个过程不允许丢弃数据。
  3. 单生产者和单消费者。即不会(也不允许)同时生产,不会(也不允许)同时消费。如果确实要多线程生产或多线程消费,本代码并不适用。
  4. 生产和消费之间线程安全。

二、实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// File: bounded_buffer.h
#pragma once

#include <memory> // for std::unique_ptr
#include <mutex>
#include <string>
#include <vector>

namespace tubumu {

/*
* @Description: BoundedBuffer。
* 注意,可以使用两个固定的线程分别调用 Produce 方法和 Consume 方法,但是多个线程调用 Produce 方法或多个线程调用 Consume 方法不是线程安全的。
*/
class BoundedBuffer {
public:
BoundedBuffer(std::string name, size_t buffers_capacity, size_t buffer_size_max);
BoundedBuffer(const BoundedBuffer&) = delete;
BoundedBuffer& operator=(const BoundedBuffer&) = delete;
BoundedBuffer(BoundedBuffer&&) = delete;
BoundedBuffer& operator=(BoundedBuffer&&) = delete;

public:
/**
* @description: 生产。非线程安全,两个及以上线程调用 Produce 可能会导致脏写。
* @param {function<void(void*)>} func
* @return {void}
*/
void Produce(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func);

/**
* @description: 消费。非线程安全,两个及以上线程调用 Consume 可能会导致读取到同一份数据。
* @param {function<void(void*)>} func
* @return {void}
*/
void Consume(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func);

private:
const std::string _name;

// 内存池
std::vector<std::unique_ptr<uint8_t[]>> _buffers;
// 内存池容量
size_t _buffers_capacity;
// 内存块最大长度
size_t _buffer_size_max;
// 保护内存池
std::mutex _buffers_mtx;
// 内存池是否有可用的 slot (非满则可以写数据)
std::condition_variable _buffers_not_full_cond;
// 内存池是否非空 (非空则可以读数据)
std::condition_variable _buffers_not_empty_cond;
// 内存池将会读取的位置
// 虽然初始值是 0,但是在尚未写入时 _buffers_not_empty_cond 会判断 _buffers_read_position 和 _buffers_write_position 不相等才会继续。
size_t _buffers_read_position;
// 内存池当前可写入的位置
size_t _buffers_write_position;
};

} // namespace tubumu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// File: bounded_buffer.cpp
#include "bounded_buffer.h"
#include <cassert>
#include <memory>
#include <mutex>

namespace tubumu {

BoundedBuffer::BoundedBuffer(std::string name, size_t buffers_capacity, size_t buffer_size_max)
: _name(std::move(name)), _buffers_capacity(buffers_capacity), _buffer_size_max(buffer_size_max),
_buffers_read_position(0), _buffers_write_position(0), _buffers_mtx(), _buffers_not_full_cond(),
_buffers_not_empty_cond(), _buffers(buffers_capacity) {
assert(buffers_capacity > 1);
assert(buffer_size_max > 0);
// 初始化智能指针向量
for (auto& buffer : _buffers) {
buffer = std::make_unique<uint8_t[]>(buffer_size_max);
}
}

void BoundedBuffer::Produce(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func) {
std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
// 等待可写 slot。要确保本次写入后,下次有写入位置,所以 +1。
_buffers_not_full_cond.wait(buffers_lock, [&] {
return (_buffers_write_position + 1) % _buffers_capacity != _buffers_read_position;
});
// 有可写 slot 马上释放。因为 func 可能是耗时操作,防止过久阻塞 Consume 造成有可读 slot 而无法读。
buffers_lock.unlock();
auto& buffer = _buffers[_buffers_write_position];
func(buffer);
// 更改写 slot
// 避免在极端情况下 _buffers_not_empty_cond.wait 丢失通知,所以上锁,即使 _buffers_write_position 为 std::atomic<size_t> 也不行。
buffers_lock.lock();
_buffers_write_position = (_buffers_write_position + 1) % _buffers_capacity;
buffers_lock.unlock();
_buffers_not_empty_cond.notify_one();
}

void BoundedBuffer::Consume(const std::function<void(std::unique_ptr<uint8_t[]>&)>& func) {
std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
// 等待读 slot
_buffers_not_empty_cond.wait(
buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });
// 有可读 slot 马上释放。因为 func 可能是耗时操作,防止过久阻塞 Produce 造成有可写 slot 而无法写。
buffers_lock.unlock();
auto& buffer = _buffers[_buffers_read_position];
func(buffer);
// 更改读 slot
// 避免在极端情况下 _buffers_not_full_cond.wait 丢失通知,所以上锁,即使 _buffers_read_position 为 std::atomic<size_t> 也不行。
buffers_lock.lock();
_buffers_read_position = (_buffers_read_position + 1) % _buffers_capacity;
buffers_lock.unlock();
_buffers_not_full_cond.notify_one();
}

} // namespace tubumu

三、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// File: test_bounded_queue.cpp
#include "bounded_buffer.h"
#include <iostream>
#include <thread>

int main(int argc, const char** argv)
{
// Buffer 中每块数据最大长度 sizeof(size_t)。实际应用中,更大长度的内存才有意义。
std::unique_ptr<tubumu::BoundedBuffer> boundedBuffer = std::make_unique<tubumu::BoundedBuffer>("Test", 4, sizeof(size_t));

std::thread producer_thread(
[&]
{
for (size_t i = 0; i < 1000; i++)
{
boundedBuffer->Produce([=](std::unique_ptr<uint8_t[]>& buffer){
std::memcpy(buffer.get(), &i, sizeof(size_t));
std::cout << "Produce: " << i << std::endl;
// 模拟生产耗时 20ms 左右。
std::this_thread::sleep_for(std::chrono::milliseconds(20));
});
}
});

std::thread consumer_thread(
[&]
{
for (size_t i = 0; i < 1000; i++)
{
boundedBuffer->Consume([=](std::unique_ptr<uint8_t[]>& buffer){
size_t value;
std::memcpy(&value, buffer.get(), sizeof(size_t));
std::cout << "Consume: " << value << std::endl;
// 模拟消费耗时 20ms 左右。
std::this_thread::sleep_for(std::chrono::milliseconds(20));
});
}
});

producer_thread.join();
consumer_thread.join();

return 0;
}

运行:

1
2
$ time ./test_bounded_queue
./test_bounded_queue 0.05s user 0.05s system 0% cpu 24.314 total

理所应当地,粗略测试耗时 24s 左右比串行 40s 左右快——这不是重点,重点是达到了内存复用的目的。

四、说明

1、的确是需要 mutex 和 condition_variable 吗?

是的。比如在生产时,发现“无法获取到”可写的 slot,又不允许丢弃数据,为了不让生产者线程轮询则只能等待。

2、为什么 Produce 和 Consume 里 wait 返回后马上解锁?

比如生产时,生产的过程可能耗时。确保“能获取到”生产 slot 后立即解锁,以便消费者线程调用 Consume 时如果阻塞在 std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); 能够取得锁,从而得以消费在本次生产之前已经生产好的 slot ——如果队列完全没有可读数据当然就“转为”阻塞在 _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });。如果是阻塞在 wait,则会在本次生产好后通过 _buffers_not_empty_cond.notify_one(); 唤醒消费者线程。