9 #ifndef CUBBYFLOW_PARALLEL_IMPL_H 10 #define CUBBYFLOW_PARALLEL_IMPL_H 15 #if defined(CUBBYFLOW_TASKING_TBB) 16 #include <tbb/parallel_for.h> 17 #include <tbb/parallel_reduce.h> 18 #include <tbb/parallel_sort.h> 20 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD) 39 template <
typename TASK>
42 #if defined(CUBBYFLOW_TASKING_TBB) 43 struct LocalTBBTask :
public tbb::task
47 LocalTBBTask(TASK&& f) : func(std::forward<TASK>(f))
52 tbb::task* execute()
override 59 auto* tbbNode =
new(tbb::task::allocate_root()) LocalTBBTask(std::forward<TASK>(fn));
60 tbb::task::enqueue(*tbbNode);
61 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD) 62 std::thread thread(fn);
64 #else // OpenMP or Serial -> Synchronous! 69 template <
typename TASK>
73 template <
typename TASK>
74 inline auto Async(TASK&& fn) -> std::future<operator_return_t<TASK>>
76 using package_t = std::packaged_task<operator_return_t<TASK>()>;
78 auto task =
new package_t(std::forward<TASK>(fn));
79 auto future = task->get_future();
96 template <
typename RandomIterator,
typename RandomIterator2,
typename CompareFunction>
97 void Merge(RandomIterator a,
size_t size, RandomIterator2 temp, CompareFunction compareFunction)
100 size_t i2 = size / 2;
103 while (i1 < size / 2 && i2 < size)
105 if (compareFunction(a[i1], a[i2]))
119 while (i1 < size / 2)
140 template <
typename RandomIterator,
typename RandomIterator2,
typename CompareFunction>
141 void ParallelMergeSort(RandomIterator a,
size_t size, RandomIterator2 temp,
unsigned int numThreads, CompareFunction compareFunction)
145 std::sort(a, a + size, compareFunction);
147 else if (numThreads > 1)
149 std::vector<std::future<void>> pool;
152 auto launchRange = [compareFunction](RandomIterator begin,
size_t k2, RandomIterator2 temp,
unsigned int numThreads)
159 launchRange(a, size / 2, temp, numThreads / 2);
164 launchRange(a + size / 2, size - size / 2, temp + size / 2, numThreads - numThreads / 2);
176 Merge(a, size, temp, compareFunction);
181 template <
typename RandomIterator,
typename T>
183 const RandomIterator& begin,
const RandomIterator& end,
186 auto diff = end - begin;
192 size_t size =
static_cast<size_t>(diff);
200 template <
typename IndexType,
typename Function>
202 IndexType beginIndex, IndexType endIndex,
205 if (beginIndex > endIndex)
210 #if defined(CUBBYFLOW_TASKING_TBB) 212 tbb::parallel_for(beginIndex, endIndex,
function);
213 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD) 217 (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
220 IndexType n = endIndex - beginIndex + 1;
221 IndexType slice =
static_cast<IndexType
>(std::round(n / static_cast<double>(numThreads)));
222 slice = std::max(slice, IndexType(1));
225 auto launchRange = [&
function](IndexType k1, IndexType k2)
227 for (IndexType k = k1; k < k2; ++k)
234 std::vector<std::thread> pool;
235 pool.reserve(numThreads);
236 IndexType i1 = beginIndex;
237 IndexType i2 = std::min(beginIndex + slice, endIndex);
239 for (
unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
241 pool.emplace_back(launchRange, i1, i2);
243 i2 = std::min(i2 + slice, endIndex);
248 pool.emplace_back(launchRange, i1, endIndex);
252 for (std::thread& t : pool)
262 #if defined(CUBBYFLOW_TASKING_OPENMP) 263 #pragma omp parallel for 264 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER) 265 for (ssize_t i = beginIndex; i < static_cast<ssize_t>(endIndex); ++i)
267 #else // !MSVC || Intel 268 for (
auto i = beginIndex; i < endIndex; ++i)
270 #endif // MSVC && !Intel 273 #else // CUBBYFLOW_TASKING_SERIAL 274 for (
auto i = beginIndex; i < endIndex; ++i)
278 #endif // CUBBYFLOW_TASKING_OPENMP 282 template <
typename IndexType,
typename Function>
284 IndexType beginIndex, IndexType endIndex,
287 if (beginIndex > endIndex)
292 #if defined(CUBBYFLOW_TASKING_TBB) 295 tbb::parallel_for(tbb::blocked_range<IndexType>(beginIndex, endIndex),
296 [&
function](
const tbb::blocked_range<IndexType>& range)
298 function(range.begin(), range.end());
303 function(beginIndex, endIndex);
309 (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
312 IndexType n = endIndex - beginIndex + 1;
313 IndexType slice =
static_cast<IndexType
>(std::round(n / static_cast<double>(numThreads)));
314 slice = std::max(slice, IndexType(1));
317 std::vector<std::future<void>> pool;
318 pool.reserve(numThreads);
319 IndexType i1 = beginIndex;
320 IndexType i2 = std::min(beginIndex + slice, endIndex);
322 for (
unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
326 i2 = std::min(i2 + slice, endIndex);
345 template <
typename IndexType,
typename Function>
347 IndexType beginIndexX, IndexType endIndexX,
348 IndexType beginIndexY, IndexType endIndexY,
351 ParallelFor(beginIndexY, endIndexY, [&](IndexType j)
353 for (IndexType i = beginIndexX; i < endIndexX; ++i)
360 template <
typename IndexType,
typename Function>
362 IndexType beginIndexX, IndexType endIndexX,
363 IndexType beginIndexY, IndexType endIndexY,
367 [&](IndexType jBegin, IndexType jEnd)
369 function(beginIndexX, endIndexX, jBegin, jEnd);
373 template <
typename IndexType,
typename Function>
375 IndexType beginIndexX, IndexType endIndexX,
376 IndexType beginIndexY, IndexType endIndexY,
377 IndexType beginIndexZ, IndexType endIndexZ,
380 ParallelFor(beginIndexZ, endIndexZ, [&](IndexType k)
382 for (IndexType j = beginIndexY; j < endIndexY; ++j)
384 for (IndexType i = beginIndexX; i < endIndexX; ++i)
392 template <
typename IndexType,
typename Function>
394 IndexType beginIndexX, IndexType endIndexX,
395 IndexType beginIndexY, IndexType endIndexY,
396 IndexType beginIndexZ, IndexType endIndexZ,
400 [&](IndexType kBegin, IndexType kEnd)
402 function(beginIndexX, endIndexX, beginIndexY, endIndexY, kBegin, kEnd);
406 template <
typename IndexType,
typename Value,
typename Function,
typename Reduce>
408 IndexType beginIndex, IndexType endIndex,
409 const Value& identity,
const Function&
function,
412 if (beginIndex > endIndex)
417 #if defined(CUBBYFLOW_TASKING_TBB) 420 return tbb::parallel_reduce(tbb::blocked_range<IndexType>(beginIndex, endIndex), identity,
421 [&
function](
const tbb::blocked_range<IndexType>& range,
const Value& init)
423 return function(range.begin(), range.end(), init);
429 return function(beginIndex, endIndex, identity);
435 (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
438 IndexType n = endIndex - beginIndex + 1;
439 IndexType slice =
static_cast<IndexType
>(std::round(n / static_cast<double>(numThreads)));
440 slice = std::max(slice, IndexType(1));
443 std::vector<Value> results(numThreads, identity);
446 auto launchRange = [&](IndexType k1, IndexType k2,
unsigned int tid)
448 results[tid] =
function(k1, k2, identity);
452 std::vector<std::future<void>> pool;
453 pool.reserve(numThreads);
455 IndexType i1 = beginIndex;
456 IndexType i2 = std::min(beginIndex + slice, endIndex);
457 unsigned int threadID = 0;
459 for (; threadID + 1 < numThreads && i1 < endIndex; ++threadID)
461 pool.emplace_back(
Internal::Async([=]() { launchRange(i1, i2, threadID); }));
463 i2 = std::min(i2 + slice, endIndex);
468 pool.emplace_back(
Internal::Async([=]() { launchRange(i1, endIndex, threadID); }));
481 Value finalResult = identity;
482 for (
const Value& val : results)
484 finalResult = reduce(val, finalResult);
491 template<
typename RandomIterator>
493 RandomIterator begin, RandomIterator end,
496 ParallelSort(begin, end, std::less<
typename std::iterator_traits<RandomIterator>::value_type>(), policy);
499 template<
typename RandomIterator,
typename CompareFunction>
501 RandomIterator begin, RandomIterator end,
509 #if defined(CUBBYFLOW_TASKING_TBB) 512 tbb::parallel_sort(begin, end, compareFunction);
516 std::sort(begin, end, compareFunction);
519 size_t size =
static_cast<size_t>(end - begin);
521 using value_type =
typename std::iterator_traits<RandomIterator>::value_type;
522 std::vector<value_type> temp(size);
527 (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
typename std::result_of< TASK()>::type operator_return_t
Definition: Parallel-Impl.h:70
unsigned int GetMaxNumberOfThreads()
Returns maximum number of threads to use.
void ParallelRangeFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a range-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.h:283
Definition: pybind11Utils.h:24
void ParallelFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a for-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.h:201
void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
Definition: Parallel-Impl.h:97
void ParallelSort(RandomIterator begin, RandomIterator end, ExecutionPolicy policy)
Sorts a container in parallel.
Definition: Parallel-Impl.h:492
constexpr size_t ZERO_SIZE
Zero size_t.
Definition: Constants.h:18
void Schedule(TASK &&fn)
Definition: Parallel-Impl.h:40
void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
Definition: Parallel-Impl.h:141
ExecutionPolicy
Execution policy tag.
Definition: Parallel.h:15
void ParallelFill(const RandomIterator &begin, const RandomIterator &end, const T &value, ExecutionPolicy policy)
Fills from begin to end with value in parallel.
Definition: Parallel-Impl.h:182
auto Async(TASK &&fn) -> std::future< operator_return_t< TASK >>
Definition: Parallel-Impl.h:74
Value ParallelReduce(IndexType beginIndex, IndexType endIndex, const Value &identity, const Function &function, const Reduce &reduce, ExecutionPolicy policy)
Performs reduce operation in parallel.
Definition: Parallel-Impl.h:407