TPIE

2362a60
parallel_sort.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet cino+=(0 :
3 // Copyright 2011, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
24 
25 #ifndef __TPIE_PARALLEL_SORT_H__
26 #define __TPIE_PARALLEL_SORT_H__
27 
28 #include <algorithm>
29 #include <cstdint>
30 #include <boost/iterator/iterator_traits.hpp>
31 #include <mutex>
32 #include <cmath>
33 #include <functional>
35 #include <tpie/dummy_progress.h>
36 #include <tpie/internal_queue.h>
37 #include <tpie/job.h>
38 #include <tpie/config.h>
39 
40 namespace tpie {
41 
50 template <typename iterator_type, typename comp_type, bool Progress,
51  size_t min_size=1024*1024*8/sizeof(typename boost::iterator_value<iterator_type>::type)>
53 private:
55 
59  struct progress_t {
60  typename P::base * pi;
61  std::uint64_t work_estimate;
62  std::uint64_t total_work_estimate;
63  std::condition_variable cond;
64  std::mutex mutex;
65  };
66 
68  typedef typename boost::iterator_value<iterator_type>::type value_type;
69 
73  static inline std::uint64_t sortWork(std::uint64_t n) {
74  if(n == 0)
75  return 0;
76 
77  return static_cast<uint64_t>(log(static_cast<double>(n)) * static_cast<double>(n) * 1.8
78  / log(static_cast<double>(2)));
79  }
80 
87  template <typename comp_t>
88  static inline iterator_type unguarded_partition(iterator_type first,
89  iterator_type last,
90  comp_t & comp) {
91  // Textbook partitioning.
92  iterator_type pivot = first;
93  while (true) {
94  do --last;
95  while (comp(*pivot, *last));
96 
97  do {
98  if (first == last) break;
99  ++first;
100  } while (comp(*first, *pivot));
101 
102  if (first == last) break;
103 
104  std::iter_swap(first, last);
105  }
106  std::iter_swap(last, pivot);
107  return last;
108  }
109 
117  static inline iterator_type median(iterator_type a, iterator_type b, iterator_type c, comp_type & comp) {
118  if (comp(*a, *b)) {
119  if (comp(*b, *c)) return b;
120  else if (comp(*a, *c)) return c;
121  else return a;
122  } else {
123  if (comp(*a, *c)) return a;
124  else if (comp(*b, *c)) return c;
125  else return b;
126  }
127  }
128 
137  static inline iterator_type pick_pivot(iterator_type a, iterator_type b, comp_type & comp) {
138  if (a == b) return a;
139  assert(a < b);
140 
141  // Since (b-a) is at least min_size, which is at least 100000 in
142  // realistic contexts, ((b-a)/8)*c is a good approximation of
143  // (c*(b-a))/8.
144  size_t step = (b-a)/8;
145 
146  return median(median(a+0, a+step, a+step*2, comp),
147  median(a+step*3, a+step*4, a+step*5, comp),
148  median(a+step*6, a+step*7, b-1, comp), comp);
149  }
150 
157  static inline iterator_type partition(iterator_type a, iterator_type b, comp_type & comp) {
158  iterator_type pivot = pick_pivot(a, b, comp);
159 
160  std::iter_swap(pivot, a);
161  iterator_type l = unguarded_partition(a, b, comp);
162 
163  return l;
164  }
165 
166 #ifdef DOXYGEN
167 public:
168 #endif
169  class qsort_job : public job {
173  public:
177  qsort_job(iterator_type a, iterator_type b, comp_type comp, qsort_job * parent, progress_t & p)
178  : a(a), b(b), comp(comp), parent(parent), progress(p) {
179 
180  // Does nothing.
181  }
182 
183  ~qsort_job() {
184  for (size_t i = 0; i < children.size(); ++i) {
185  delete children[i];
186  }
187  children.resize(0);
188  }
189 
195  virtual void operator()() override {
196  assert(a <= b);
197  while (static_cast<size_t>(b - a) >= min_size) {
198  iterator_type pivot = partition(a, b, comp);
199  add_progress(b - a);
200  //qsort_job * j = tpie_new<qsort_job>(a, pivot, comp, this);
201  qsort_job * j = new qsort_job(a, pivot, comp, this, progress);
202  j->enqueue(this);
203  children.push_back(j);
204  a = pivot+1;
205  }
206  std::sort(a, b, comp);
207  add_progress(sortWork(b - a));
208  }
209 
210  protected:
211  virtual void on_done() override {
212  // Unfortunately, it might not be safe to delete our children at
213  // this point, as other threads might in theory wait for them to
214  // .join(). It is safer to postpone deletion until our own
215  // deletion.
216  if (!parent) {
217  std::lock_guard<std::mutex> lock(progress.mutex);
218  progress.work_estimate = progress.total_work_estimate;
219  progress.cond.notify_one();
220  }
221  }
222 
223  private:
224  iterator_type a;
225  iterator_type b;
226  comp_type comp;
227  qsort_job * parent;
228  progress_t & progress;
229 
230  std::vector<qsort_job *> children;
231 
232  void add_progress(uint64_t amount) {
233  std::lock_guard<std::mutex> lock(progress.mutex);
234  progress.work_estimate += amount;
235  progress.cond.notify_one();
236  }
237  };
238 public:
239  parallel_sort_impl(typename P::base * p) {
240  progress.pi = p;
241  }
242 
248  void operator()(iterator_type a, iterator_type b, comp_type comp=std::less<value_type>() ) {
249  progress.work_estimate = 0;
250  progress.total_work_estimate = sortWork(b-a);
251  if (progress.pi) progress.pi->init(progress.total_work_estimate);
252 
253  if (static_cast<size_t>(b - a) < min_size) {
254  std::sort(a, b, comp);
255  if (progress.pi) progress.pi->done();
256  return;
257  }
258 
259  qsort_job * master = new qsort_job(a, b, comp, 0, progress);
260  master->enqueue();
261 
262  std::uint64_t prev_work_estimate = 0;
263  std::unique_lock<std::mutex> lock(progress.mutex);
264  while (progress.work_estimate < progress.total_work_estimate) {
265  if (progress.pi && progress.work_estimate > prev_work_estimate) progress.pi->step(progress.work_estimate - prev_work_estimate);
266  prev_work_estimate = progress.work_estimate;
267  progress.cond.wait(lock);
268  }
269  lock.unlock();
270 
271  master->join();
272  delete master;
273  if (progress.pi) progress.pi->done();
274  }
275 private:
276  static const size_t max_job_count=256;
277  progress_t progress;
278  bool kill;
279  size_t working;
280 
281  std::pair<iterator_type, iterator_type> jobs[max_job_count];
282  size_t job_count;
283 };
284 
293 template <bool Progress, typename iterator_type, typename comp_type>
294 void parallel_sort(iterator_type a,
295  iterator_type b,
297  comp_type comp=std::less<typename boost::iterator_value<iterator_type>::type>()) {
298 #ifdef TPIE_PARALLEL_SORT
300  s(a,b,comp);
301 #else
302  pi.init(1);
303  std::sort(a,b,comp);
304  pi.done();
305 #endif
306 }
307 
315 template <typename iterator_type, typename comp_type>
316 void parallel_sort(iterator_type a,
317  iterator_type b,
318  comp_type comp=std::less<typename boost::iterator_value<iterator_type>::type>()) {
319 #ifdef TPIE_PARALLEL_SORT
321  s(a,b,comp);
322 #else
323  std::sort(a, b, comp);
324 #endif
325 }
326 
327 
328 }
329 #endif //__TPIE_PARALLEL_SORT_H__
void sort(uncompressed_stream< T > &instream, uncompressed_stream< T > &outstream, Compare comp, progress_indicator_base &indicator)
Sort elements of a stream using the given STL-style comparator object.
Definition: sort.h:141
The base class for indicating the progress of some task.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
void operator()(iterator_type a, iterator_type b, comp_type comp=std::less< value_type >())
Perform a parallel sort of the items in the interval [a,b).
virtual void done()
Advance the indicator to the end.
virtual void on_done() override
Called when this job and all subjobs are done.
Generic internal queue with known memory requirements.
A simple parallel sort implementation with progress tracking.
Definition: parallel_sort.h:52
Definition: job.h:33
Represents quick sort work at a given level.
Progress indicator base.
void join()
Wait for this job and its subjobs to complete.
Job class for job manager.
qsort_job(iterator_type a, iterator_type b, comp_type comp, qsort_job *parent, progress_t &p)
Construct a qsort_job.
For applications where you wish to disable progress indicators via a template parameter, refer to progress_types members names sub, fp and base.
void enqueue(job *parent=0)
Add this job to the job pool.
virtual void operator()() override
Running a job with iterators a and b will repeatedly partition [a,b), spawn a job on the left part an...
Progress indicator concept in an efficient non-inheritance way.
virtual void init(stream_size_type range=0)
Initialize progress indicator.