SyncTaskQueue.h
#pragma once
#include <list>
#include <mutex>
#include <condition_variable>
#include <iostream>
template <typename TASK>
class SyncTaskQueue//队列内部实现加锁,保证操作同步
{ //这个队列是被线程池使用,因此具体实例在线程池中定义
public:
SyncTaskQueue(int max_size) :max_size_(max_size) {
}
~SyncTaskQueue() {//何时析构 如果有个操作在一个线程中阻塞, 对象无法析构
Stop();
std::cout << "SyncTaskQueue destruction" << std::endl;
}
void Stop() {//退出循环
stop_ = true;
not_empty_cond_.notify_one();
not_full_cond_.notify_one();
std::cout << "SyncTaskQueue Stop" << std::endl;
}
bool IsFull() {
std::lock_guard<std::mutex> locker;
return list_task_.size() == max_size_;
}
bool IsEmpty() {
std::lock_guard<std::mutex> locker;
return list_task_.size() == 0;
}
void Push(TASK &&data) {
std::unique_lock<std::mutex> locker(mutex_);
while (Full() && !stop_) {//避免多次获取互斥锁
std::cout << "task queue is full, wait" << std::endl;//阻塞
//满的时候等待, 阻塞等待消费
not_full_cond_.wait_for(locker, std::chrono::milliseconds(500));
}
if (stop_) {
return;
}
if (!Full()) {
list_task_.push_back(std::forward<TASK>(data));//为什么需要std::forward,保证右值,移动拷贝?
not_empty_cond_.notify_one();//not empty cond signal
}
}
void Pop(TASK& data){//没有用返回值的形式
std::unique_lock<std::mutex> locker(mutex_);
while (Empty() && !stop_) {
std::cout << "task queue is empty, wait" << std::endl;//阻塞
not_empty_cond_.wait_for(locker, std::chrono::milliseconds(500));
}
if (!Empty()) {
data = list_task_.front();
list_task_.pop_front();//list pop操作分为2步
not_full_cond_.notify_one();//not full cond signal
}
}
private:
bool Full() {
return list_task_.size() == max_size_;
}
bool Empty() {
return list_task_.size() == 0;
}
std::mutex mutex_;
int max_size_;
std::atomic<bool> stop_ = false;
std::condition_variable not_full_cond_;//没有满的时候激发
std::condition_variable not_empty_cond_;
std::list<TASK> list_task_;
};
ThreadPool.h
#pragma once
#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
#include "SyncTaskQueue.h"
#include <mutex>
#include <condition_variable>
#include <iostream>
class ThreadPool {
public:
using Task = std::function<void()>;//使用别名
ThreadPool();
~ThreadPool();
void Stop();
void AddTask(Task &&task);
private:
void Start(int num_thread);
void RunThread();
void StopThread();
private:
//多个线程对象容器,方便管理
std::list<std::shared_ptr<std::thread>> thread_group_;
int thread_num_;//线程数
SyncTaskQueue<Task> queue_;//任务队列
std::atomic<bool> stop_ = false;//需要包含头文件atomic
std::once_flag flag_;
};
ThreadPool.cpp
#include "ThreadPool.h"
ThreadPool::ThreadPool():queue_(10)
{//构造函数 Start私有化 保证也只能执行一次
thread_num_ = std::thread::hardware_concurrency();
Start(thread_num_);
}
ThreadPool::~ThreadPool()
{
Stop();
}
void ThreadPool::Stop() {//保证stop 只有一次
std::call_once(flag_, [this] {StopThread(); });
}
void ThreadPool::AddTask(Task &&task) {
queue_.Push(std::forward<Task>(task));
}
void ThreadPool::Start(int num_thread) {
thread_num_ = num_thread;
std::cout << "thread pool start" << std::endl;
for (int i = 0; i < thread_num_; i++) {
thread_group_.push_back(std::make_shared<std::thread>(&ThreadPool::RunThread, this));//创建线程的过程中将线程函数传进去
std::cout << "thread " << thread_group_.back()->get_id() << " create " << std::endl;
}
#if 0
for (auto thread : thread_group_) {
thread->get_id();
}
#endif
}
//所有的子线程都会从任务队列里面去取任务执行
void ThreadPool::RunThread() {//线程从任务队列中取任务
//多个线程里面只有队列任务共享的,stop_数据是共享的
while (!stop_) {//如果没有停止,则一直在while循环
Task task_object;
queue_.Pop(task_object);//如果没有数据,会自动阻塞
if (stop_) {//如果时停止,则直接return
return;
}
task_object();//取出任务执行,本线程不结束,继续从队列里面取任务执行
std::cout << "thread id " << std::this_thread::get_id() << " exec one task" << std::endl;
}
}
void ThreadPool::StopThread() {
stop_ = true;
queue_.Stop();//任务队列可能阻塞,需要先停止
//等待线程池中的所有线程执行结束
for (auto it = thread_group_.begin(); it != thread_group_.end(); it++) {
(*it)->join();
}
thread_group_.clear();//线程对象列表清除
std::cout << "thread pool stop" << std::endl;
}
void test_ThreadPool()
{
ThreadPool thread_pool;
std::thread thread1([&thread_pool] {
auto id = std::this_thread::get_id();
std::cout << "thread id " << id << " add task " << std::endl;
thread_pool.AddTask([id]() {
std::cout << "thread id " << id << " exec task " <<std::endl;
});
});
std::thread thread2([&thread_pool] {
auto id = std::this_thread::get_id();
std::cout << "thread id " << id << " add task " << std::endl;
thread_pool.AddTask([id]() {
std::cout << "thread id " << id << " exec task " << std::endl;
});
});
std::thread thread3([&thread_pool] {
auto id = std::this_thread::get_id();
std::cout << "thread id " << id << "add task" << std::endl;
thread_pool.AddTask([id]() {
std::cout << "thread id " << id << "exec task" << std::endl;
});
});
//过完2s再结束
std::this_thread::sleep_for(std::chrono::seconds(2));
thread_pool.Stop();//线程池结束
thread1.join();
thread2.join();
thread3.join();//同步线程函数执行完
}
原文:https://www.cnblogs.com/welen/p/14551276.html