1 // -*- C++ -*- 2 3 // Copyright (C) 2007-2018 Free Software Foundation, Inc. 4 // 5 // This file is part of the GNU ISO C++ Library. This library is free 6 // software; you can redistribute it and/or modify it under the terms 7 // of the GNU General Public License as published by the Free Software 8 // Foundation; either version 3, or (at your option) any later 9 // version. 10 11 // This library is distributed in the hope that it will be useful, but 12 // WITHOUT ANY WARRANTY; without even the implied warranty of 13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 // General Public License for more details. 15 16 // Under Section 7 of GPL version 3, you are granted additional 17 // permissions described in the GCC Runtime Library Exception, version 18 // 3.1, as published by the Free Software Foundation. 19 20 // You should have received a copy of the GNU General Public License and 21 // a copy of the GCC Runtime Library Exception along with this program; 22 // see the files COPYING3 and COPYING.RUNTIME respectively. If not, see 23 // <http://www.gnu.org/licenses/>. 24 25 /** @file parallel/workstealing.h 26 * @brief Parallelization of embarrassingly parallel execution by 27 * means of work-stealing. 28 * 29 * Work stealing is described in 30 * 31 * R. D. Blumofe and C. E. Leiserson. 32 * Scheduling multithreaded computations by work stealing. 33 * Journal of the ACM, 46(5):720–748, 1999. 34 * 35 * This file is a GNU parallel extension to the Standard C++ Library. 36 */ 37 38 // Written by Felix Putze. 39 40 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H 41 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1 42 43 #include <parallel/parallel.h> 44 #include <parallel/random_number.h> 45 #include <parallel/compatibility.h> 46 47 namespace __gnu_parallel 48 { 49 50 #define _GLIBCXX_JOB_VOLATILE volatile 51 52 /** @brief One __job for a certain thread. */ 53 template<typename _DifferenceTp> 54 struct _Job 55 { 56 typedef _DifferenceTp _DifferenceType; 57 58 /** @brief First element. 59 * 60 * Changed by owning and stealing thread. By stealing thread, 61 * always incremented. */ 62 _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first; 63 64 /** @brief Last element. 65 * 66 * Changed by owning thread only. */ 67 _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last; 68 69 /** @brief Number of elements, i.e. @c _M_last-_M_first+1. 70 * 71 * Changed by owning thread only. */ 72 _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load; 73 }; 74 75 /** @brief Work stealing algorithm for random access iterators. 76 * 77 * Uses O(1) additional memory. Synchronization at job lists is 78 * done with atomic operations. 79 * @param __begin Begin iterator of element sequence. 80 * @param __end End iterator of element sequence. 81 * @param __op User-supplied functor (comparator, predicate, adding 82 * functor, ...). 83 * @param __f Functor to @a process an element with __op (depends on 84 * desired functionality, e. g. for std::for_each(), ...). 85 * @param __r Functor to @a add a single __result to the already 86 * processed elements (depends on functionality). 87 * @param __base Base value for reduction. 88 * @param __output Pointer to position where final result is written to 89 * @param __bound Maximum number of elements processed (e. g. for 90 * std::count_n()). 91 * @return User-supplied functor (that may contain a part of the result). 92 */ 93 template<typename _RAIter, 94 typename _Op, 95 typename _Fu, 96 typename _Red, 97 typename _Result> 98 _Op __for_each_template_random_access_workstealing(_RAIter __begin,_RAIter __end,_Op __op,_Fu & __f,_Red __r,_Result __base,_Result & __output,typename std::iterator_traits<_RAIter>::difference_type __bound)99 __for_each_template_random_access_workstealing(_RAIter __begin, 100 _RAIter __end, _Op __op, 101 _Fu& __f, _Red __r, 102 _Result __base, 103 _Result& __output, 104 typename std::iterator_traits<_RAIter>::difference_type __bound) 105 { 106 _GLIBCXX_CALL(__end - __begin) 107 108 typedef std::iterator_traits<_RAIter> _TraitsType; 109 typedef typename _TraitsType::difference_type _DifferenceType; 110 111 const _Settings& __s = _Settings::get(); 112 113 _DifferenceType __chunk_size = 114 static_cast<_DifferenceType>(__s.workstealing_chunk_size); 115 116 // How many jobs? 117 _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound; 118 119 // To avoid false sharing in a cache line. 120 const int __stride = (__s.cache_line_size * 10 121 / sizeof(_Job<_DifferenceType>) + 1); 122 123 // Total number of threads currently working. 124 _ThreadIndex __busy = 0; 125 126 _Job<_DifferenceType> *__job; 127 128 omp_lock_t __output_lock; 129 omp_init_lock(&__output_lock); 130 131 // Write base value to output. 132 __output = __base; 133 134 // No more threads than jobs, at least one thread. 135 _ThreadIndex __num_threads = __gnu_parallel::max<_ThreadIndex> 136 (1, __gnu_parallel::min<_DifferenceType>(__length, 137 __get_max_threads())); 138 139 # pragma omp parallel shared(__busy) num_threads(__num_threads) 140 { 141 # pragma omp single 142 { 143 __num_threads = omp_get_num_threads(); 144 145 // Create job description array. 146 __job = new _Job<_DifferenceType>[__num_threads * __stride]; 147 } 148 149 // Initialization phase. 150 151 // Flags for every thread if it is doing productive work. 152 bool __iam_working = false; 153 154 // Thread id. 155 _ThreadIndex __iam = omp_get_thread_num(); 156 157 // This job. 158 _Job<_DifferenceType>& __my_job = __job[__iam * __stride]; 159 160 // Random number (for work stealing). 161 _ThreadIndex __victim; 162 163 // Local value for reduction. 164 _Result __result = _Result(); 165 166 // Number of elements to steal in one attempt. 167 _DifferenceType __steal; 168 169 // Every thread has its own random number generator 170 // (modulo __num_threads). 171 _RandomNumber __rand_gen(__iam, __num_threads); 172 173 // This thread is currently working. 174 # pragma omp atomic 175 ++__busy; 176 177 __iam_working = true; 178 179 // How many jobs per thread? last thread gets the rest. 180 __my_job._M_first = static_cast<_DifferenceType> 181 (__iam * (__length / __num_threads)); 182 183 __my_job._M_last = (__iam == (__num_threads - 1) 184 ? (__length - 1) 185 : ((__iam + 1) * (__length / __num_threads) - 1)); 186 __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; 187 188 // Init result with _M_first value (to have a base value for reduction) 189 if (__my_job._M_first <= __my_job._M_last) 190 { 191 // Cannot use volatile variable directly. 192 _DifferenceType __my_first = __my_job._M_first; 193 __result = __f(__op, __begin + __my_first); 194 ++__my_job._M_first; 195 --__my_job._M_load; 196 } 197 198 _RAIter __current; 199 200 # pragma omp barrier 201 202 // Actual work phase 203 // Work on own or stolen current start 204 while (__busy > 0) 205 { 206 // Work until no productive thread left. 207 # pragma omp flush(__busy) 208 209 // Thread has own work to do 210 while (__my_job._M_first <= __my_job._M_last) 211 { 212 // fetch-and-add call 213 // Reserve current job block (size __chunk_size) in my queue. 214 _DifferenceType __current_job = 215 __fetch_and_add<_DifferenceType>(&(__my_job._M_first), 216 __chunk_size); 217 218 // Update _M_load, to make the three values consistent, 219 // _M_first might have been changed in the meantime 220 __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; 221 for (_DifferenceType __job_counter = 0; 222 __job_counter < __chunk_size 223 && __current_job <= __my_job._M_last; 224 ++__job_counter) 225 { 226 // Yes: process it! 227 __current = __begin + __current_job; 228 ++__current_job; 229 230 // Do actual work. 231 __result = __r(__result, __f(__op, __current)); 232 } 233 234 # pragma omp flush(__busy) 235 } 236 237 // After reaching this point, a thread's __job list is empty. 238 if (__iam_working) 239 { 240 // This thread no longer has work. 241 # pragma omp atomic 242 --__busy; 243 244 __iam_working = false; 245 } 246 247 _DifferenceType __supposed_first, __supposed_last, 248 __supposed_load; 249 do 250 { 251 // Find random nonempty deque (not own), do consistency check. 252 __yield(); 253 # pragma omp flush(__busy) 254 __victim = __rand_gen(); 255 __supposed_first = __job[__victim * __stride]._M_first; 256 __supposed_last = __job[__victim * __stride]._M_last; 257 __supposed_load = __job[__victim * __stride]._M_load; 258 } 259 while (__busy > 0 260 && ((__supposed_load <= 0) 261 || ((__supposed_first + __supposed_load - 1) 262 != __supposed_last))); 263 264 if (__busy == 0) 265 break; 266 267 if (__supposed_load > 0) 268 { 269 // Has work and work to do. 270 // Number of elements to steal (at least one). 271 __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2; 272 273 // Push __victim's current start forward. 274 _DifferenceType __stolen_first = 275 __fetch_and_add<_DifferenceType> 276 (&(__job[__victim * __stride]._M_first), __steal); 277 _DifferenceType __stolen_try = (__stolen_first + __steal 278 - _DifferenceType(1)); 279 280 __my_job._M_first = __stolen_first; 281 __my_job._M_last = __gnu_parallel::min(__stolen_try, 282 __supposed_last); 283 __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; 284 285 // Has potential work again. 286 # pragma omp atomic 287 ++__busy; 288 __iam_working = true; 289 290 # pragma omp flush(__busy) 291 } 292 # pragma omp flush(__busy) 293 } // end while __busy > 0 294 // Add accumulated result to output. 295 omp_set_lock(&__output_lock); 296 __output = __r(__output, __result); 297 omp_unset_lock(&__output_lock); 298 } 299 300 delete[] __job; 301 302 // Points to last element processed (needed as return value for 303 // some algorithms like transform) 304 __f._M_finish_iterator = __begin + __length; 305 306 omp_destroy_lock(&__output_lock); 307 308 return __op; 309 } 310 } // end namespace 311 312 #endif /* _GLIBCXX_PARALLEL_WORKSTEALING_H */ 313