Apache Arrow (C++)
A columnar in-memory analytics layer designed to accelerate big data.
parallel.h
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #ifndef ARROW_UTIL_PARALLEL_H
19 #define ARROW_UTIL_PARALLEL_H
20 
21 #include <atomic>
22 #include <mutex>
23 #include <thread>
24 #include <vector>
25 
26 #include "arrow/status.h"
27 #include "arrow/util/thread-pool.h"
28 
29 namespace arrow {
30 namespace internal {
31 
32 // A parallelizer that takes a `Status(int)` function and calls it with
33 // arguments between 0 and `num_tasks - 1`, on an arbitrary number of threads.
34 
35 template <class FUNCTION>
36 Status ParallelFor(int num_tasks, FUNCTION&& func) {
37  auto pool = internal::GetCpuThreadPool();
38  std::vector<std::future<Status>> futures(num_tasks);
39 
40  for (int i = 0; i < num_tasks; ++i) {
41  futures[i] = pool->Submit(func, i);
42  }
43  auto st = Status::OK();
44  for (auto& fut : futures) {
45  st &= fut.get();
46  }
47  return st;
48 }
49 
50 // A variant of ParallelFor() with an explicit number of dedicated threads.
51 // In most cases it's more appropriate to use the 2-argument ParallelFor (above),
52 // or directly the global CPU thread pool (arrow/util/thread-pool.h).
53 
54 template <class FUNCTION>
55 Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
56  std::vector<std::thread> thread_pool;
57  thread_pool.reserve(nthreads);
58  std::atomic<int> task_counter(0);
59 
60  std::mutex error_mtx;
61  bool error_occurred = false;
62  Status error;
63 
64  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
65  thread_pool.emplace_back(
66  [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
67  int task_id;
68  while (!error_occurred) {
69  task_id = task_counter.fetch_add(1);
70  if (task_id >= num_tasks) {
71  break;
72  }
73  Status s = func(task_id);
74  if (!s.ok()) {
75  std::lock_guard<std::mutex> lock(error_mtx);
76  error_occurred = true;
77  error = s;
78  break;
79  }
80  }
81  });
82  }
83  for (auto&& thread : thread_pool) {
84  thread.join();
85  }
86  if (error_occurred) {
87  return error;
88  }
89  return Status::OK();
90 }
91 
92 } // namespace internal
93 } // namespace arrow
94 
95 #endif
static Status OK()
Definition: status.h:124
Top-level namespace for Apache Arrow C++ API.
Definition: adapter.h:32