Parallel-Impl.h
Go to the documentation of this file.
1 /*************************************************************************
2 > File Name: Parallel-Impl.h
3 > Project Name: CubbyFlow
4 > Author: Chan-Ho Chris Ohk
5 > Purpose: Parallel functions for CubbyFlow.
6 > Created Time: 2017/02/05
7 > Copyright (c) 2018, Chan-Ho Chris Ohk
8 *************************************************************************/
9 #ifndef CUBBYFLOW_PARALLEL_IMPL_H
10 #define CUBBYFLOW_PARALLEL_IMPL_H
11 
12 #include <Core/Utils/Constants.h>
13 #include <Core/Utils/Parallel.h>
14 
15 #if defined(CUBBYFLOW_TASKING_TBB)
16 #include <tbb/parallel_for.h>
17 #include <tbb/parallel_reduce.h>
18 #include <tbb/parallel_sort.h>
19 #include <tbb/task.h>
20 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
21 #include <thread>
22 #endif
23 
24 #include <algorithm>
25 #include <cmath>
26 #include <future>
27 #include <vector>
28 
29 #undef max
30 #undef min
31 
32 namespace CubbyFlow
33 {
34  namespace Internal
35  {
36  // NOTE: This abstraction takes a lambda which should take captured
37  // variables by *value* to ensure no captured references race
38  // with the task itself.
39  template <typename TASK>
40  inline void Schedule(TASK&& fn)
41  {
42 #if defined(CUBBYFLOW_TASKING_TBB)
43  struct LocalTBBTask : public tbb::task
44  {
45  TASK func;
46 
47  LocalTBBTask(TASK&& f) : func(std::forward<TASK>(f))
48  {
49  // Do nothing
50  }
51 
52  tbb::task* execute() override
53  {
54  func();
55  return nullptr;
56  }
57  };
58 
59  auto* tbbNode = new(tbb::task::allocate_root()) LocalTBBTask(std::forward<TASK>(fn));
60  tbb::task::enqueue(*tbbNode);
61 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
62  std::thread thread(fn);
63  thread.detach();
64 #else // OpenMP or Serial -> Synchronous!
65  fn();
66 #endif
67  }
68 
69  template <typename TASK>
70  using operator_return_t = typename std::result_of<TASK()>::type;
71 
72  // NOTE: see above, same issues associated with Schedule()
73  template <typename TASK>
74  inline auto Async(TASK&& fn) -> std::future<operator_return_t<TASK>>
75  {
76  using package_t = std::packaged_task<operator_return_t<TASK>()>;
77 
78  auto task = new package_t(std::forward<TASK>(fn));
79  auto future = task->get_future();
80 
81  Schedule([=]()
82  {
83  (*task)();
84  delete task;
85  });
86 
87  return future;
88  }
89 
90  // Adopted from:
91  // Radenski, A.
92  // Shared Memory, Message Passing, and Hybrid Merge Sorts for Standalone and
93  // Clustered SMPs. Proc PDPTA'11, the 2011 International Conference on Parallel
94  // and Distributed Processing Techniques and Applications, CSREA Press
95  // (H. Arabnia, Ed.), 2011, pp. 367 - 373.
96  template <typename RandomIterator, typename RandomIterator2, typename CompareFunction>
97  void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
98  {
99  size_t i1 = 0;
100  size_t i2 = size / 2;
101  size_t tempi = 0;
102 
103  while (i1 < size / 2 && i2 < size)
104  {
105  if (compareFunction(a[i1], a[i2]))
106  {
107  temp[tempi] = a[i1];
108  i1++;
109  }
110  else
111  {
112  temp[tempi] = a[i2];
113  i2++;
114  }
115 
116  tempi++;
117  }
118 
119  while (i1 < size / 2)
120  {
121  temp[tempi] = a[i1];
122  i1++;
123  tempi++;
124  }
125 
126  while (i2 < size)
127  {
128  temp[tempi] = a[i2];
129  i2++;
130  tempi++;
131  }
132 
133  // Copy sorted temp array into main array, a
134  ParallelFor(ZERO_SIZE, size, [&](size_t i)
135  {
136  a[i] = temp[i];
137  });
138  }
139 
140  template <typename RandomIterator, typename RandomIterator2, typename CompareFunction>
141  void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
142  {
143  if (numThreads == 1)
144  {
145  std::sort(a, a + size, compareFunction);
146  }
147  else if (numThreads > 1)
148  {
149  std::vector<std::future<void>> pool;
150  pool.reserve(2);
151 
152  auto launchRange = [compareFunction](RandomIterator begin, size_t k2, RandomIterator2 temp, unsigned int numThreads)
153  {
154  ParallelMergeSort(begin, k2, temp, numThreads, compareFunction);
155  };
156 
157  pool.emplace_back(Internal::Async([=]()
158  {
159  launchRange(a, size / 2, temp, numThreads / 2);
160  }));
161 
162  pool.emplace_back(Internal::Async([=]()
163  {
164  launchRange(a + size / 2, size - size / 2, temp + size / 2, numThreads - numThreads / 2);
165  }));
166 
167  // Wait for jobs to finish
168  for (auto& f : pool)
169  {
170  if (f.valid())
171  {
172  f.wait();
173  }
174  }
175 
176  Merge(a, size, temp, compareFunction);
177  }
178  }
179  }
180 
181  template <typename RandomIterator, typename T>
183  const RandomIterator& begin, const RandomIterator& end,
184  const T& value, ExecutionPolicy policy)
185  {
186  auto diff = end - begin;
187  if (diff <= 0)
188  {
189  return;
190  }
191 
192  size_t size = static_cast<size_t>(diff);
193  ParallelFor(ZERO_SIZE, size, [begin, value](size_t i)
194  {
195  begin[i] = value;
196  }, policy);
197  }
198 
199  // Adopted from http://ideone.com/Z7zldb
200  template <typename IndexType, typename Function>
202  IndexType beginIndex, IndexType endIndex,
203  const Function& function, ExecutionPolicy policy)
204  {
205  if (beginIndex > endIndex)
206  {
207  return;
208  }
209 
210 #if defined(CUBBYFLOW_TASKING_TBB)
211  (void)policy;
212  tbb::parallel_for(beginIndex, endIndex, function);
213 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
214  // Estimate number of threads in the pool
215  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
216  const unsigned int numThreads = (policy == ExecutionPolicy::Parallel) ?
217  (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
218 
219  // Size of a slice for the range functions
220  IndexType n = endIndex - beginIndex + 1;
221  IndexType slice = static_cast<IndexType>(std::round(n / static_cast<double>(numThreads)));
222  slice = std::max(slice, IndexType(1));
223 
224  // [Helper] Inner loop
225  auto launchRange = [&function](IndexType k1, IndexType k2)
226  {
227  for (IndexType k = k1; k < k2; ++k)
228  {
229  function(k);
230  }
231  };
232 
233  // Create pool and launch jobs
234  std::vector<std::thread> pool;
235  pool.reserve(numThreads);
236  IndexType i1 = beginIndex;
237  IndexType i2 = std::min(beginIndex + slice, endIndex);
238 
239  for (unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
240  {
241  pool.emplace_back(launchRange, i1, i2);
242  i1 = i2;
243  i2 = std::min(i2 + slice, endIndex);
244  }
245 
246  if (i1 < endIndex)
247  {
248  pool.emplace_back(launchRange, i1, endIndex);
249  }
250 
251  // Wait for jobs to finish
252  for (std::thread& t : pool)
253  {
254  if (t.joinable())
255  {
256  t.join();
257  }
258  }
259 #else
260  (void)policy;
261 
262 #if defined(CUBBYFLOW_TASKING_OPENMP)
263 #pragma omp parallel for
264 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
265  for (ssize_t i = beginIndex; i < static_cast<ssize_t>(endIndex); ++i)
266  {
267 #else // !MSVC || Intel
268  for (auto i = beginIndex; i < endIndex; ++i)
269  {
270 #endif // MSVC && !Intel
271  function(i);
272  }
273 #else // CUBBYFLOW_TASKING_SERIAL
274  for (auto i = beginIndex; i < endIndex; ++i)
275  {
276  function(i);
277  }
278 #endif // CUBBYFLOW_TASKING_OPENMP
279 #endif
280  }
281 
282  template <typename IndexType, typename Function>
284  IndexType beginIndex, IndexType endIndex,
285  const Function& function, ExecutionPolicy policy)
286  {
287  if (beginIndex > endIndex)
288  {
289  return;
290  }
291 
292 #if defined(CUBBYFLOW_TASKING_TBB)
293  if (policy == ExecutionPolicy::Parallel)
294  {
295  tbb::parallel_for(tbb::blocked_range<IndexType>(beginIndex, endIndex),
296  [&function](const tbb::blocked_range<IndexType>& range)
297  {
298  function(range.begin(), range.end());
299  });
300  }
301  else
302  {
303  function(beginIndex, endIndex);
304  }
305 #else
306  // Estimate number of threads in the pool
307  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
308  const unsigned int numThreads = (policy == ExecutionPolicy::Parallel) ?
309  (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
310 
311  // Size of a slice for the range functions
312  IndexType n = endIndex - beginIndex + 1;
313  IndexType slice = static_cast<IndexType>(std::round(n / static_cast<double>(numThreads)));
314  slice = std::max(slice, IndexType(1));
315 
316  // Create pool and launch jobs
317  std::vector<std::future<void>> pool;
318  pool.reserve(numThreads);
319  IndexType i1 = beginIndex;
320  IndexType i2 = std::min(beginIndex + slice, endIndex);
321 
322  for (unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
323  {
324  pool.emplace_back(Internal::Async([=]() { function(i1, i2); }));
325  i1 = i2;
326  i2 = std::min(i2 + slice, endIndex);
327  }
328 
329  if (i1 < endIndex)
330  {
331  pool.emplace_back(Internal::Async([=]() { function(i1, endIndex); }));
332  }
333 
334  // Wait for jobs to finish
335  for (auto& f : pool)
336  {
337  if (f.valid())
338  {
339  f.wait();
340  }
341  }
342 #endif
343  }
344 
345  template <typename IndexType, typename Function>
347  IndexType beginIndexX, IndexType endIndexX,
348  IndexType beginIndexY, IndexType endIndexY,
349  const Function& function, ExecutionPolicy policy)
350  {
351  ParallelFor(beginIndexY, endIndexY, [&](IndexType j)
352  {
353  for (IndexType i = beginIndexX; i < endIndexX; ++i)
354  {
355  function(i, j);
356  }
357  }, policy);
358  }
359 
360  template <typename IndexType, typename Function>
362  IndexType beginIndexX, IndexType endIndexX,
363  IndexType beginIndexY, IndexType endIndexY,
364  const Function& function, ExecutionPolicy policy)
365  {
366  ParallelRangeFor(beginIndexY, endIndexY,
367  [&](IndexType jBegin, IndexType jEnd)
368  {
369  function(beginIndexX, endIndexX, jBegin, jEnd);
370  }, policy);
371  }
372 
373  template <typename IndexType, typename Function>
375  IndexType beginIndexX, IndexType endIndexX,
376  IndexType beginIndexY, IndexType endIndexY,
377  IndexType beginIndexZ, IndexType endIndexZ,
378  const Function& function, ExecutionPolicy policy)
379  {
380  ParallelFor(beginIndexZ, endIndexZ, [&](IndexType k)
381  {
382  for (IndexType j = beginIndexY; j < endIndexY; ++j)
383  {
384  for (IndexType i = beginIndexX; i < endIndexX; ++i)
385  {
386  function(i, j, k);
387  }
388  }
389  }, policy);
390  }
391 
392  template <typename IndexType, typename Function>
394  IndexType beginIndexX, IndexType endIndexX,
395  IndexType beginIndexY, IndexType endIndexY,
396  IndexType beginIndexZ, IndexType endIndexZ,
397  const Function& function, ExecutionPolicy policy)
398  {
399  ParallelRangeFor(beginIndexZ, endIndexZ,
400  [&](IndexType kBegin, IndexType kEnd)
401  {
402  function(beginIndexX, endIndexX, beginIndexY, endIndexY, kBegin, kEnd);
403  }, policy);
404  }
405 
406  template <typename IndexType, typename Value, typename Function, typename Reduce>
408  IndexType beginIndex, IndexType endIndex,
409  const Value& identity, const Function& function,
410  const Reduce& reduce, ExecutionPolicy policy)
411  {
412  if (beginIndex > endIndex)
413  {
414  return identity;
415  }
416 
417 #if defined(CUBBYFLOW_TASKING_TBB)
418  if (policy == ExecutionPolicy::Parallel)
419  {
420  return tbb::parallel_reduce(tbb::blocked_range<IndexType>(beginIndex, endIndex), identity,
421  [&function](const tbb::blocked_range<IndexType>& range, const Value& init)
422  {
423  return function(range.begin(), range.end(), init);
424  }, reduce);
425  }
426  else
427  {
428  (void)reduce;
429  return function(beginIndex, endIndex, identity);
430  }
431 #else
432  // Estimate number of threads in the pool
433  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
434  const unsigned int numThreads = (policy == ExecutionPolicy::Parallel) ?
435  (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
436 
437  // Size of a slice for the range functions
438  IndexType n = endIndex - beginIndex + 1;
439  IndexType slice = static_cast<IndexType>(std::round(n / static_cast<double>(numThreads)));
440  slice = std::max(slice, IndexType(1));
441 
442  // Results
443  std::vector<Value> results(numThreads, identity);
444 
445  // [Helper] Inner loop
446  auto launchRange = [&](IndexType k1, IndexType k2, unsigned int tid)
447  {
448  results[tid] = function(k1, k2, identity);
449  };
450 
451  // Create pool and launch jobs
452  std::vector<std::future<void>> pool;
453  pool.reserve(numThreads);
454 
455  IndexType i1 = beginIndex;
456  IndexType i2 = std::min(beginIndex + slice, endIndex);
457  unsigned int threadID = 0;
458 
459  for (; threadID + 1 < numThreads && i1 < endIndex; ++threadID)
460  {
461  pool.emplace_back(Internal::Async([=]() { launchRange(i1, i2, threadID); }));
462  i1 = i2;
463  i2 = std::min(i2 + slice, endIndex);
464  }
465 
466  if (i1 < endIndex)
467  {
468  pool.emplace_back(Internal::Async([=]() { launchRange(i1, endIndex, threadID); }));
469  }
470 
471  // Wait for jobs to finish
472  for (auto& f : pool)
473  {
474  if (f.valid())
475  {
476  f.wait();
477  }
478  }
479 
480  // Gather
481  Value finalResult = identity;
482  for (const Value& val : results)
483  {
484  finalResult = reduce(val, finalResult);
485  }
486 
487  return finalResult;
488 #endif
489  }
490 
491  template<typename RandomIterator>
493  RandomIterator begin, RandomIterator end,
494  ExecutionPolicy policy)
495  {
496  ParallelSort(begin, end, std::less<typename std::iterator_traits<RandomIterator>::value_type>(), policy);
497  }
498 
499  template<typename RandomIterator, typename CompareFunction>
501  RandomIterator begin, RandomIterator end,
502  CompareFunction compareFunction, ExecutionPolicy policy)
503  {
504  if (begin > end)
505  {
506  return;
507  }
508 
509 #if defined(CUBBYFLOW_TASKING_TBB)
510  if (policy == ExecutionPolicy::Parallel)
511  {
512  tbb::parallel_sort(begin, end, compareFunction);
513  }
514  else
515  {
516  std::sort(begin, end, compareFunction);
517  }
518 #else
519  size_t size = static_cast<size_t>(end - begin);
520 
521  using value_type = typename std::iterator_traits<RandomIterator>::value_type;
522  std::vector<value_type> temp(size);
523 
524  // Estimate number of threads in the pool
525  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
526  const unsigned int numThreads = (policy == ExecutionPolicy::Parallel) ?
527  (numThreadsHint == 0u ? 8u : numThreadsHint) : 1;
528 
529  Internal::ParallelMergeSort(begin, size, temp.begin(), numThreads, compareFunction);
530 #endif
531  }
532 }
533 
534 #endif
typename std::result_of< TASK()>::type operator_return_t
Definition: Parallel-Impl.h:70
unsigned int GetMaxNumberOfThreads()
Returns maximum number of threads to use.
void ParallelRangeFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a range-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.h:283
Definition: pybind11Utils.h:24
void ParallelFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a for-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.h:201
void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
Definition: Parallel-Impl.h:97
void ParallelSort(RandomIterator begin, RandomIterator end, ExecutionPolicy policy)
Sorts a container in parallel.
Definition: Parallel-Impl.h:492
constexpr size_t ZERO_SIZE
Zero size_t.
Definition: Constants.h:18
void Schedule(TASK &&fn)
Definition: Parallel-Impl.h:40
void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
Definition: Parallel-Impl.h:141
ExecutionPolicy
Execution policy tag.
Definition: Parallel.h:15
void ParallelFill(const RandomIterator &begin, const RandomIterator &end, const T &value, ExecutionPolicy policy)
Fills from begin to end with value in parallel.
Definition: Parallel-Impl.h:182
auto Async(TASK &&fn) -> std::future< operator_return_t< TASK >>
Definition: Parallel-Impl.h:74
Value ParallelReduce(IndexType beginIndex, IndexType endIndex, const Value &identity, const Function &function, const Reduce &reduce, ExecutionPolicy policy)
Performs reduce operation in parallel.
Definition: Parallel-Impl.h:407