Loading...
Searching...
No Matches
Parallel-Impl.hpp
Go to the documentation of this file.
1// This code is based on Jet framework.
2// Copyright (c) 2018 Doyub Kim
3// CubbyFlow is voxel-based fluid simulation engine for computer games.
4// Copyright (c) 2020 CubbyFlow Team
5// Core Part: Chris Ohk, Junwoo Hwang, Jihong Sin, Seungwoo Yoo
6// AI Part: Dongheon Cho, Minseo Kim
7// We are making my contributions/submissions to this project solely in our
8// personal capacity and are not conveying any rights to any intellectual
9// property of any third parties.
10
11#ifndef CUBBYFLOW_PARALLEL_IMPL_HPP
12#define CUBBYFLOW_PARALLEL_IMPL_HPP
13
16
17#if defined(CUBBYFLOW_TASKING_HPX)
18#include <hpx/include/future.hpp>
19#include <hpx/include/parallel_fill.hpp>
20#include <hpx/include/parallel_for_each.hpp>
21#include <hpx/include/parallel_for_loop.hpp>
22#include <hpx/include/parallel_reduce.hpp>
23#include <hpx/include/parallel_sort.hpp>
24#endif
25
26#if defined(CUBBYFLOW_TASKING_TBB)
27#include <tbb/parallel_for.h>
28#include <tbb/parallel_reduce.h>
29#include <tbb/parallel_sort.h>
30#include <tbb/task.h>
31#elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
32#include <thread>
33#endif
34
35#include <algorithm>
36#include <cmath>
37#include <future>
38#include <vector>
39
40#undef max
41#undef min
42
43namespace CubbyFlow
44{
45namespace Internal
46{
47#if defined(CUBBYFLOW_TASKING_HPX)
48template <typename Task>
49using future = hpx::future<Task>;
50#else
51template <typename Task>
52using future = std::future<Task>;
53#endif
54
55template <typename TASK>
56using operator_return_t = typename std::invoke_result_t<TASK>;
57
58template <typename TASK>
60{
61#if defined(CUBBYFLOW_TASKING_HPX)
62 return hpx::async(std::forward<TASK>(fn));
63
64#elif defined(CUBBYFLOW_TASKING_TBB)
65 struct LocalTBBTask : public tbb::task
66 {
67 TASK func;
68
69 LocalTBBTask(TASK&& f) : func(std::forward<TASK>(f))
70 {
71 // Do nothing
72 }
73
74 tbb::task* execute() override
75 {
76 func();
77 return nullptr;
78 }
79 };
80
81 using package_t = std::packaged_task<operator_return_t<TASK>()>;
82
83 auto task = new package_t(std::forward<TASK>(fn));
84 auto* tbbNode = new (tbb::task::allocate_root()) LocalTBBTask([=]() {
85 (*task)();
86 delete task;
87 });
88
89 tbb::task::enqueue(*tbbNode);
90 return task.get_future();
91
92#elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
93 return std::async(std::launch::async, fn);
94#else
95 return std::async(std::launch::deferred, fn);
96#endif
97}
98
99// Adopted from:
100// Radenski, A.
101// Shared Memory, Message Passing, and Hybrid Merge Sorts for Standalone and
102// Clustered SMPs. Proc PDPTA'11, the 2011 International Conference on Parallel
103// and Distributed Processing Techniques and Applications, CSREA Press
104// (H. Arabnia, Ed.), 2011, pp. 367 - 373.
105template <typename RandomIterator, typename RandomIterator2,
106 typename CompareFunction>
109{
110 size_t i1 = 0;
111 size_t i2 = size / 2;
112 size_t tempi = 0;
113
114 while (i1 < size / 2 && i2 < size)
115 {
116 if (compareFunction(a[i1], a[i2]))
117 {
118 temp[tempi] = a[i1];
119 i1++;
120 }
121 else
122 {
123 temp[tempi] = a[i2];
124 i2++;
125 }
126
127 tempi++;
128 }
129
130 while (i1 < size / 2)
131 {
132 temp[tempi] = a[i1];
133 i1++;
134 tempi++;
135 }
136
137 while (i2 < size)
138 {
139 temp[tempi] = a[i2];
140 i2++;
141 tempi++;
142 }
143
144 // Copy sorted temp array into main array, a
145 ParallelFor(ZERO_SIZE, size, [&](size_t i) { a[i] = temp[i]; });
146}
147
148template <typename RandomIterator, typename RandomIterator2,
149 typename CompareFunction>
152{
153 if (numThreads == 1)
154 {
155 std::sort(a, a + size, compareFunction);
156 }
157 else if (numThreads > 1)
158 {
159 std::vector<future<void>> pool;
160 pool.reserve(2);
161
162 auto launchRange = [compareFunction](RandomIterator begin, size_t k2,
164 unsigned int numThreads) {
166 };
167
168 pool.emplace_back(Internal::Async(
169 [=]() { launchRange(a, size / 2, temp, numThreads / 2); }));
170
171 pool.emplace_back(Internal::Async([=]() {
172 launchRange(a + size / 2, size - size / 2, temp + size / 2,
173 numThreads - numThreads / 2);
174 }));
175
176 // Wait for jobs to finish
177 for (auto& f : pool)
178 {
179 if (f.valid())
180 {
181 f.wait();
182 }
183 }
184
186 }
187}
188} // namespace Internal
189
190template <typename RandomIterator, typename T>
191void ParallelFill(const RandomIterator& begin, const RandomIterator& end,
192 const T& value, ExecutionPolicy policy)
193{
194 auto diff = end - begin;
195 if (diff <= 0)
196 {
197 return;
198 }
199
200#if defined(CUBBYFLOW_TASKING_HPX)
201 hpx::parallel::fill(hpx::parallel::execution::par, begin, end, value);
202#else
203 size_t size = static_cast<size_t>(diff);
205 ZERO_SIZE, size, [begin, value](size_t i) { begin[i] = value; },
206 policy);
207#endif
208}
209
210// Adopted from http://ideone.com/Z7zldb
211template <typename IndexType, typename Function>
214{
215 if (beginIndex > endIndex)
216 {
217 return;
218 }
219
221 {
222#if defined(CUBBYFLOW_TASKING_TBB)
223 (void)policy;
224 tbb::parallel_for(beginIndex, endIndex, function);
225#elif defined(CUBBYFLOW_TASKING_HPX)
226 (void)policy;
227 hpx::parallel::for_loop(hpx::parallel::execution::par, beginIndex,
229#elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
230 // Estimate number of threads in the pool
231 const unsigned int numThreadsHint = GetMaxNumberOfThreads();
232 const unsigned int numThreads =
233 (numThreadsHint == 0u) ? 8u : numThreadsHint;
234
235 // Size of a slice for the range functions
237 IndexType slice = static_cast<IndexType>(
238 std::round(n / static_cast<double>(numThreads)));
239 slice = std::max(slice, IndexType(1));
240
241 // [Helper] Inner loop
243 for (IndexType k = k1; k < k2; ++k)
244 {
245 function(k);
246 }
247 };
248
249 // Create pool and launch jobs
250 std::vector<std::thread> pool;
251 pool.reserve(numThreads);
253 IndexType i2 = std::min(beginIndex + slice, endIndex);
254
255 for (unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
256 {
257 pool.emplace_back(launchRange, i1, i2);
258 i1 = i2;
259 i2 = std::min(i2 + slice, endIndex);
260 }
261
262 if (i1 < endIndex)
263 {
264 pool.emplace_back(launchRange, i1, endIndex);
265 }
266
267 // Wait for jobs to finish
268 for (std::thread& t : pool)
269 {
270 if (t.joinable())
271 {
272 t.join();
273 }
274 }
275#else
276 (void)policy;
277
278#if defined(CUBBYFLOW_TASKING_OPENMP)
279#pragma omp parallel for
280#if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
282 {
283#else // !MSVC || Intel
284 for (auto i = beginIndex; i < endIndex; ++i)
285 {
286#endif // MSVC && !Intel
287 function(i);
288 }
289#else // CUBBYFLOW_TASKING_SERIAL
290 for (auto i = beginIndex; i < endIndex; ++i)
291 {
292 function(i);
293 }
294#endif // CUBBYFLOW_TASKING_OPENMP
295#endif
296 }
297 else
298 {
299 for (auto i = beginIndex; i < endIndex; ++i)
300 {
301 function(i);
302 }
303 }
304}
305
306template <typename IndexType, typename Function>
309{
310 if (beginIndex > endIndex)
311 {
312 return;
313 }
314
316 {
317#if defined(CUBBYFLOW_TASKING_TBB)
318 tbb::parallel_for(
319 tbb::blocked_range<IndexType>(beginIndex, endIndex),
320 [&function](const tbb::blocked_range<IndexType>& range) {
322 });
323#else
324 // Estimate number of threads in the pool
325 const unsigned int numThreadsHint = GetMaxNumberOfThreads();
326 const unsigned int numThreads =
328
329 // Size of a slice for the range functions
331 IndexType slice = static_cast<IndexType>(
332 std::round(n / static_cast<double>(numThreads)));
333 slice = std::max(slice, IndexType(1));
334
335 // Create pool and launch jobs
336 std::vector<CubbyFlow::Internal::future<void>> pool;
337 pool.reserve(numThreads);
339 IndexType i2 = std::min(beginIndex + slice, endIndex);
340
341 for (unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
342 {
343 pool.emplace_back(Internal::Async([=]() { function(i1, i2); }));
344 i1 = i2;
345 i2 = std::min(i2 + slice, endIndex);
346 }
347
348 if (i1 < endIndex)
349 {
350 pool.emplace_back(
351 Internal::Async([=]() { function(i1, endIndex); }));
352 }
353
354 // Wait for jobs to finish
355 for (auto& f : pool)
356 {
357 if (f.valid())
358 {
359 f.wait();
360 }
361 }
362#endif
363 }
364 else
365 {
367 }
368}
369
370template <typename IndexType, typename Function>
374{
377 [&](IndexType j) {
378 for (IndexType i = beginIndexX; i < endIndexX; ++i)
379 {
380 function(i, j);
381 }
382 },
383 policy);
384}
385
386template <typename IndexType, typename Function>
398
399template <typename IndexType, typename Function>
404{
407 [&](IndexType k) {
408 for (IndexType j = beginIndexY; j < endIndexY; ++j)
409 {
410 for (IndexType i = beginIndexX; i < endIndexX; ++i)
411 {
412 function(i, j, k);
413 }
414 }
415 },
416 policy);
417}
418
419template <typename IndexType, typename Function>
433
434template <typename IndexType, typename Value, typename Function,
435 typename Reduce>
437 const Value& identity, const Function& function,
438 const Reduce& reduce, ExecutionPolicy policy)
439{
440 if (beginIndex > endIndex)
441 {
442 return identity;
443 }
444
446 {
447#if defined(CUBBYFLOW_TASKING_TBB)
448 return tbb::parallel_reduce(
449 tbb::blocked_range<IndexType>(beginIndex, endIndex), identity,
450 [&function](const tbb::blocked_range<IndexType>& range,
451 const Value& init) {
452 return function(range.begin(), range.end(), init);
453 },
454 reduce);
455#else
456 // Estimate number of threads in the pool
457 const unsigned int numThreadsHint = GetMaxNumberOfThreads();
458 const unsigned int numThreads =
459 (numThreadsHint == 0u) ? 8u : numThreadsHint;
460
461 // Size of a slice for the range functions
463 IndexType slice = static_cast<IndexType>(
464 std::round(n / static_cast<double>(numThreads)));
465 slice = std::max(slice, IndexType(1));
466
467 // Results
468 std::vector<Value> results(numThreads, identity);
469
470 // [Helper] Inner loop
471 auto launchRange = [&](IndexType k1, IndexType k2, unsigned int tid) {
473 };
474
475 // Create pool and launch jobs
476 std::vector<CubbyFlow::Internal::future<void>> pool;
477 pool.reserve(numThreads);
478
480 IndexType i2 = std::min(beginIndex + slice, endIndex);
481 unsigned int threadID = 0;
482
483 for (; threadID + 1 < numThreads && i1 < endIndex; ++threadID)
484 {
485 pool.emplace_back(
486 Internal::Async([=]() { launchRange(i1, i2, threadID); }));
487
488 i1 = i2;
489 i2 = std::min(i2 + slice, endIndex);
490 }
491
492 if (i1 < endIndex)
493 {
494 pool.emplace_back(Internal::Async(
495 [=]() { launchRange(i1, endIndex, threadID); }));
496 }
497
498 // Wait for jobs to finish
499 for (auto& f : pool)
500 {
501 if (f.valid())
502 {
503 f.wait();
504 }
505 }
506
507 // Gather
509 for (const Value& val : results)
510 {
512 }
513
514 return finalResult;
515#endif
516 }
517
518 (void)reduce;
520}
521
522template <typename RandomIterator>
525{
527 begin, end,
528 std::less<typename std::iterator_traits<RandomIterator>::value_type>(),
529 policy);
530}
531
532template <typename RandomIterator, typename CompareFunction>
535{
536 if (begin > end)
537 {
538 return;
539 }
540
542 {
543#if defined(CUBBYFLOW_TASKING_HPX)
544 hpx::parallel::sort(hpx::parallel::execution::par, begin, end,
546#elif defined(CUBBYFLOW_TASKING_TBB)
547 tbb::parallel_sort(begin, end, compareFunction);
548#else
549
550 size_t size = static_cast<size_t>(end - begin);
551
552 using value_type =
553 typename std::iterator_traits<RandomIterator>::value_type;
554 std::vector<value_type> temp(size);
555
556 // Estimate number of threads in the pool
557 const unsigned int numThreadsHint = GetMaxNumberOfThreads();
558 const unsigned int numThreads =
559 (numThreadsHint == 0u) ? 8u : numThreadsHint;
560
563#endif
564 }
565 else
566 {
567 std::sort(begin, end, compareFunction);
568 }
569}
570} // namespace CubbyFlow
571
572#endif
Definition Matrix.hpp:30
Iterator begin()
Definition Matrix-Impl.hpp:272
Iterator end()
Definition Matrix-Impl.hpp:285
std::future< Task > future
Definition Parallel-Impl.hpp:52
void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
Definition Parallel-Impl.hpp:150
typename std::invoke_result_t< TASK > operator_return_t
Definition Parallel-Impl.hpp:56
auto Async(TASK &&fn) -> future< operator_return_t< TASK > >
Definition Parallel-Impl.hpp:59
void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
Definition Parallel-Impl.hpp:107
Definition pybind11Utils.hpp:21
constexpr size_t ZERO_SIZE
Zero size_t.
Definition Constants.hpp:20
void ParallelSort(RandomIterator begin, RandomIterator end, ExecutionPolicy policy)
Sorts a container in parallel.
Definition Parallel-Impl.hpp:523
void ParallelFill(const RandomIterator &begin, const RandomIterator &end, const T &value, ExecutionPolicy policy)
Fills from begin to end with value in parallel.
Definition Parallel-Impl.hpp:191
void ParallelFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a for-loop from beginIndex to endIndex in parallel.
Definition Parallel-Impl.hpp:212
Matrix< T, Rows, 1 > Vector
Definition Matrix.hpp:738
unsigned int GetMaxNumberOfThreads()
Returns maximum number of threads to use.
void ParallelRangeFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a range-loop from beginIndex to endIndex in parallel.
Definition Parallel-Impl.hpp:307
Value ParallelReduce(IndexType beginIndex, IndexType endIndex, const Value &identity, const Function &function, const Reduce &reduce, ExecutionPolicy policy)
Performs reduce operation in parallel.
Definition Parallel-Impl.hpp:436
ExecutionPolicy
Execution policy tag.
Definition Parallel.hpp:18