本文作者:icy

C++高性能并发队列:concurrentqueue库详解

icy 昨天 7 抢沙发
C++高性能并发队列:concurrentqueue库详解摘要: C++高性能并发队列:concurrentqueue库详解 项目概述 concurrentqueue是一个C++11实现的高性能多生产者多消费者(MPMC)无锁队列库,由Camer...

C++高性能并发队列:concurrentqueue库详解

C++高性能并发队列:concurrentqueue库详解

项目概述

concurrentqueue是一个C++11实现的高性能多生产者多消费者(MPMC)无锁队列库,由Cameron Desrochers开发。该库设计用于高并发场景,提供了线程安全的数据结构,特别适合需要高效线程间通信的应用程序。

核心特性

1. 无锁设计

  • 采用无锁算法,避免线程阻塞
  • 支持多生产者多消费者并发操作
  • 基于C++11原子操作实现

2. 高性能

  • 极低的入队出队开销
  • 批量操作支持,减少同步开销
  • 内存预分配,减少动态分配次数

3. 灵活配置

  • 可配置的初始容量和增长策略
  • 支持显式和隐式生产者令牌
  • 提供阻塞和非阻塞两种操作模式

安装与使用

安装方式

text
# 直接包含头文件即可使用
git clone https://github.com/cameron314/concurrentqueue.git

基本使用示例

text
#include "concurrentqueue.h"
#include <iostream>
#include <thread>
#include <vector>

// 基本队列使用
void basic_example() {
    moodycamel::ConcurrentQueue<int> queue;
    
    // 生产者线程
    std::thread producer([&queue]() {
        for (int i = 0; i < 10; ++i) {
            queue.enqueue(i);
            std::cout << "生产: " << i << std::endl;
        }
    });
    
    // 消费者线程
    std::thread consumer([&queue]() {
        int value;
        for (int i = 0; i < 10; ++i) {
            if (queue.try_dequeue(value)) {
                std::cout << "消费: " << value << std::endl;
            }
        }
    });
    
    producer.join();
    consumer.join();
}

高级功能示例

1. 批量操作

text
void batch_operations_example() {
    moodycamel::ConcurrentQueue<int> queue;
    
    // 批量入队
    int items[] = {1, 2, 3, 4, 5};
    queue.enqueue_bulk(items, 5);
    
    // 批量出队
    int results[5];
    size_t count = queue.try_dequeue_bulk(results, 5);
    
    std::cout << "批量消费了 " << count << " 个元素" << std::endl;
}

2. 使用生产者令牌

text
void producer_token_example() {
    moodycamel::ConcurrentQueue<int> queue;
    
    // 创建生产者令牌
    moodycamel::ProducerToken token(queue);
    
    // 使用令牌入队(性能更优)
    for (int i = 0; i < 100; ++i) {
        queue.enqueue(token, i);
    }
    
    // 使用令牌批量入队
    int bulk_items[10];
    for (int i = 0; i < 10; ++i) {
        bulk_items[i] = i * 10;
    }
    queue.enqueue_bulk(token, bulk_items, 10);
}

3. 阻塞队列

text
#include "blockingconcurrentqueue.h"

void blocking_queue_example() {
    moodycamel::BlockingConcurrentQueue<int> queue;
    
    std::thread consumer([&queue]() {
        int value;
        // 阻塞等待直到有数据可用
        queue.wait_dequeue(value);
        std::cout << "收到数据: " << value << std::endl;
    });
    
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    // 生产者
    queue.enqueue(42);
    
    consumer.join();
}

实际应用场景

1. 线程池任务队列

text
class ThreadPool {
private:
    moodycamel::BlockingConcurrentQueue<std::function<void()>> tasks;
    std::vector<std::thread> workers;
    bool stop = false;
    
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this] {
                while (!stop) {
                    std::function<void()> task;
                    if (tasks.wait_dequeue(task)) {
                        task();
                    }
                }
            });
        }
    }
    
    template<typename F>
    void enqueue(F&& f) {
        tasks.enqueue(std::forward<F>(f));
    }
    
    ~ThreadPool() {
        stop = true;
        for (auto& worker : workers) {
            if (worker.joinable()) worker.join();
        }
    }
};

2. 高性能日志系统

text
class Logger {
private:
    moodycamel::ConcurrentQueue<std::string> log_queue;
    std::thread log_thread;
    std::ofstream log_file;
    
public:
    Logger(const std::string& filename) : log_file(filename) {
        log_thread = std::thread([this]() {
            std::string log_entry;
            while (true) {
                if (log_queue.try_dequeue(log_entry)) {
                    log_file << log_entry << std::endl;
                } else {
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                }
            }
        });
    }
    
    void log(const std::string& message) {
        auto timestamp = std::chrono::system_clock::now();
        log_queue.enqueue(std::to_string(
            std::chrono::duration_cast<std::chrono::milliseconds>(
                timestamp.time_since_epoch()).count()) + ": " + message);
    }
};

性能优化建议

  1. 批量操作优先:尽量使用enqueue_bulktry_dequeue_bulk减少锁竞争
  2. 使用生产者令牌:对于频繁生产数据的线程,使用ProducerToken提高性能
  3. 合理设置容量:根据实际需求预分配队列容量,避免频繁扩容
  4. 选择合适的队列类型:根据是否需要阻塞操作选择ConcurrentQueueBlockingConcurrentQueue

与其他队列的对比

特性 concurrentqueue std::queue + mutex boost::lockfree::queue
线程安全 ✅ MPMC ✅ (需加锁) ✅ SPMC/MPSC
无锁设计
批量操作
阻塞支持 ✅ (Blocking版本)
内存预分配

总结

concurrentqueue库是一个功能强大、性能优异的并发队列实现,特别适合高并发场景下的线程间通信。其无锁设计、批量操作支持和灵活的生产者令牌机制使其在性能上具有明显优势。无论是构建高性能服务器、实时数据处理系统还是复杂的多线程应用,concurrentqueue都是一个值得考虑的优秀选择。

项目持续维护,文档完善,社区活跃,是C++并发编程中不可或缺的工具之一。

concurrentqueue_20260205062014.zip
类型:压缩文件|已下载:0|下载方式:免费下载
立即下载
文章版权及转载声明

作者:icy本文地址:https://www.zelig.cn/2026/03/377.html发布于 昨天
文章转载或复制请以超链接形式并注明出处软角落-SoftNook

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

阅读
分享

发表评论

快捷回复:

验证码

评论列表 (暂无评论,7人围观)参与讨论

还没有评论,来说两句吧...