C++高性能并发队列:concurrentqueue库详解
项目概述
concurrentqueue是一个C++11实现的高性能多生产者多消费者(MPMC)无锁队列库,由Cameron Desrochers开发。该库设计用于高并发场景,提供了线程安全的数据结构,特别适合需要高效线程间通信的应用程序。
核心特性
1. 无锁设计
- 采用无锁算法,避免线程阻塞
- 支持多生产者多消费者并发操作
- 基于C++11原子操作实现
2. 高性能
- 极低的入队出队开销
- 批量操作支持,减少同步开销
- 内存预分配,减少动态分配次数
3. 灵活配置
- 可配置的初始容量和增长策略
- 支持显式和隐式生产者令牌
- 提供阻塞和非阻塞两种操作模式
安装与使用
安装方式
text
# 直接包含头文件即可使用 git clone https://github.com/cameron314/concurrentqueue.git
基本使用示例
text
#include "concurrentqueue.h"
#include <iostream>
#include <thread>
#include <vector>
// 基本队列使用
void basic_example() {
moodycamel::ConcurrentQueue<int> queue;
// 生产者线程
std::thread producer([&queue]() {
for (int i = 0; i < 10; ++i) {
queue.enqueue(i);
std::cout << "生产: " << i << std::endl;
}
});
// 消费者线程
std::thread consumer([&queue]() {
int value;
for (int i = 0; i < 10; ++i) {
if (queue.try_dequeue(value)) {
std::cout << "消费: " << value << std::endl;
}
}
});
producer.join();
consumer.join();
}
高级功能示例
1. 批量操作
text
void batch_operations_example() {
moodycamel::ConcurrentQueue<int> queue;
// 批量入队
int items[] = {1, 2, 3, 4, 5};
queue.enqueue_bulk(items, 5);
// 批量出队
int results[5];
size_t count = queue.try_dequeue_bulk(results, 5);
std::cout << "批量消费了 " << count << " 个元素" << std::endl;
}
2. 使用生产者令牌
text
void producer_token_example() {
moodycamel::ConcurrentQueue<int> queue;
// 创建生产者令牌
moodycamel::ProducerToken token(queue);
// 使用令牌入队(性能更优)
for (int i = 0; i < 100; ++i) {
queue.enqueue(token, i);
}
// 使用令牌批量入队
int bulk_items[10];
for (int i = 0; i < 10; ++i) {
bulk_items[i] = i * 10;
}
queue.enqueue_bulk(token, bulk_items, 10);
}
3. 阻塞队列
text
#include "blockingconcurrentqueue.h"
void blocking_queue_example() {
moodycamel::BlockingConcurrentQueue<int> queue;
std::thread consumer([&queue]() {
int value;
// 阻塞等待直到有数据可用
queue.wait_dequeue(value);
std::cout << "收到数据: " << value << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(1));
// 生产者
queue.enqueue(42);
consumer.join();
}
实际应用场景
1. 线程池任务队列
text
class ThreadPool {
private:
moodycamel::BlockingConcurrentQueue<std::function<void()>> tasks;
std::vector<std::thread> workers;
bool stop = false;
public:
ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
while (!stop) {
std::function<void()> task;
if (tasks.wait_dequeue(task)) {
task();
}
}
});
}
}
template<typename F>
void enqueue(F&& f) {
tasks.enqueue(std::forward<F>(f));
}
~ThreadPool() {
stop = true;
for (auto& worker : workers) {
if (worker.joinable()) worker.join();
}
}
};
2. 高性能日志系统
text
class Logger {
private:
moodycamel::ConcurrentQueue<std::string> log_queue;
std::thread log_thread;
std::ofstream log_file;
public:
Logger(const std::string& filename) : log_file(filename) {
log_thread = std::thread([this]() {
std::string log_entry;
while (true) {
if (log_queue.try_dequeue(log_entry)) {
log_file << log_entry << std::endl;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
});
}
void log(const std::string& message) {
auto timestamp = std::chrono::system_clock::now();
log_queue.enqueue(std::to_string(
std::chrono::duration_cast<std::chrono::milliseconds>(
timestamp.time_since_epoch()).count()) + ": " + message);
}
};
性能优化建议
- 批量操作优先:尽量使用
enqueue_bulk和try_dequeue_bulk减少锁竞争 - 使用生产者令牌:对于频繁生产数据的线程,使用
ProducerToken提高性能 - 合理设置容量:根据实际需求预分配队列容量,避免频繁扩容
- 选择合适的队列类型:根据是否需要阻塞操作选择
ConcurrentQueue或BlockingConcurrentQueue
与其他队列的对比
| 特性 | concurrentqueue | std::queue + mutex | boost::lockfree::queue |
|---|---|---|---|
| 线程安全 | ✅ MPMC | ✅ (需加锁) | ✅ SPMC/MPSC |
| 无锁设计 | ✅ | ❌ | ✅ |
| 批量操作 | ✅ | ❌ | ✅ |
| 阻塞支持 | ✅ (Blocking版本) | ❌ | ❌ |
| 内存预分配 | ✅ | ❌ | ✅ |
总结
concurrentqueue库是一个功能强大、性能优异的并发队列实现,特别适合高并发场景下的线程间通信。其无锁设计、批量操作支持和灵活的生产者令牌机制使其在性能上具有明显优势。无论是构建高性能服务器、实时数据处理系统还是复杂的多线程应用,concurrentqueue都是一个值得考虑的优秀选择。
项目持续维护,文档完善,社区活跃,是C++并发编程中不可或缺的工具之一。
concurrentqueue_20260205062014.zip
类型:压缩文件|已下载:0|下载方式:免费下载
立即下载




还没有评论,来说两句吧...