下例实现了一个并行版的 std::accumulate 。代码中将整体工作拆分成小任务交给每个线程去做,其中设置最小任务数,是为了避免产生太多的线程。
template<typename Iterator, typename T> struct accumulate_block { void operator()(Iterator first, Iterator last, T& result) { result = std::accumulate(first, last, result);// 求和 } }; template <typename Iterator, typename T> T parallel_accumulate(Iterator first, Iterator last, T init) { unsigned long const length = std::distance(first, last); // 输入范围(任务总数) if (!length) return init; unsigned long const min_per_thread = 25;// 单个线程的最小任务数 unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;// 确定最大线程数 unsigned long const hardware_threads = std::thread::hardware_concurrency();// 返回能同时并发在一个程序中的线程数量 unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);// 计算量的最大值和硬件支持线程数中,较小的值为启动线程的数量 unsigned long const block_size = length / num_threads;// 单线程的任务量 std::vector<T> results(num_threads); std::vector<std::thread> threads(num_threads - 1);// 启动的线程数必须比num_threads少1个,因为在启动之前已经有了一个线程(主线程) Iterator block_start = first; for (unsigned i = 0; i < (num_threads - 1); i++) { Iterator block_end = block_start; std::advance(block_end, block_size); threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start, block_end, std::ref(results[i]));// 开始线程开始计算 block_start = block_end; } accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]); // 主线程计算剩余任务量 std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));// 依次遍历调用join() return std::accumulate(results.begin(), results.end(), init);// 将所有线程的计算结果求和 } int main() { std::vector<unsigned> valueSet; for (unsigned i = 0; i < 5000000; i++) { valueSet.push_back(rand() % 26); } unsigned long long init = 0; unsigned long long result = parallel_accumulate(valueSet.begin(), valueSet.end(), init); return 0; }
原文:https://www.cnblogs.com/tgcf/p/14812618.html