TPIE

2362a60
merge_sorter.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_MERGE_SORTER_H__
21 #define __TPIE_PIPELINING_MERGE_SORTER_H__
22 
23 #include <tpie/compressed/stream.h>
24 #include <tpie/pipelining/sort_parameters.h>
25 #include <tpie/pipelining/merger.h>
26 #include <tpie/pipelining/node.h>
27 #include <tpie/pipelining/exception.h>
28 #include <tpie/dummy_progress.h>
29 #include <tpie/array_view.h>
30 #include <tpie/parallel_sort.h>
31 
32 namespace tpie {
33 
34 namespace bits {
35 
62 public:
63  run_positions();
64  ~run_positions();
65 
69  static memory_size_type memory_usage();
70 
74  void open();
75 
79  void close();
80 
84  void evacuate();
85 
89  void unevacuate();
90 
94  void next_level();
95 
99  void final_level(memory_size_type fanout);
100 
104  void set_position(memory_size_type mergeLevel, memory_size_type runNumber, stream_position pos);
105 
109  stream_position get_position(memory_size_type mergeLevel, memory_size_type runNumber);
110 
111 private:
113  bool m_open;
115  bool m_evacuated;
117  bool m_final;
118 
120  memory_size_type m_levels;
121 
122  memory_size_type m_runs[2];
123  temp_file m_positionsFile[2];
124  stream_position m_positionsPosition[2];
125  file_stream<stream_position> m_positions[2];
126 
128  array<stream_position> m_finalPositions;
130  bool m_finalExtraSet;
132  stream_position m_finalExtra;
133 };
134 
135 } // namespace bits
136 
149 template <typename T, bool UseProgress, typename pred_t = std::less<T>, typename store_t=default_store>
151 private:
152  typedef typename store_t::template element_type<T>::type TT;
153  typedef typename store_t::template specific<TT> specific_store_t;
154  typedef typename specific_store_t::outer_type outer_type; //Should be the same as T
155  typedef typename specific_store_t::store_type store_type;
156  typedef typename specific_store_t::element_type element_type; //Should be the same as TT
157  typedef outer_type item_type;
158  static const size_t item_size = specific_store_t::item_size;
159 public:
160 
161  typedef std::shared_ptr<merge_sorter> ptr;
163 
164  static const memory_size_type defaultFiles = 253; // Default number of files available, when not using set_available_files
165  static const memory_size_type minimumFilesPhase1 = 1;
166  static const memory_size_type maximumFilesPhase1 = 1;
167  static const memory_size_type minimumFilesPhase2 = 5;
168  static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max();
169  static const memory_size_type minimumFilesPhase3 = 5;
170  static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max();
171 
172  inline merge_sorter(pred_t pred = pred_t(), store_t store = store_t())
173  : m_bucketPtr(new memory_bucket())
174  , m_bucket(memory_bucket_ref(m_bucketPtr.get()))
175  , m_state(stNotStarted)
176  , p()
177  , m_parametersSet(false)
178  , m_store(store.template get_specific<element_type>())
179  , m_merger(pred, m_store, m_bucket)
180  , m_currentRunItems(m_bucket)
181  , m_maxItems(std::numeric_limits<stream_size_type>::max())
182  , pred(pred)
183  , m_evacuated(false)
184  , m_finalMergeInitialized(false)
185  , m_owning_node(nullptr)
186  {}
187 
192  inline void set_parameters(memory_size_type runLength, memory_size_type fanout) {
193  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
194  p.runLength = p.internalReportThreshold = runLength;
195  p.fanout = p.finalFanout = fanout;
196  m_parametersSet = true;
197  log_debug() << "Manually set merge sort run length and fanout\n";
198  log_debug() << "Run length = " << p.runLength << " (uses memory " << (p.runLength*item_size + file_stream<element_type>::memory_usage()) << ")\n";
199  log_debug() << "Fanout = " << p.fanout << " (uses memory " << fanout_memory_usage(p.fanout) << ")" << std::endl;
200  }
201 
206  inline void set_available_files(memory_size_type f) {
207  p.filesPhase1 = p.filesPhase2 = p.filesPhase3 = f;
208  check_not_started();
209  }
210 
217  inline void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3) {
218  p.filesPhase1 = f1;
219  p.filesPhase2 = f2;
220  p.filesPhase3 = f3;
221  check_not_started();
222  }
223 
228  inline void set_available_memory(memory_size_type m) {
229  p.memoryPhase1 = p.memoryPhase2 = p.memoryPhase3 = m;
230  check_not_started();
231  }
232 
239  inline void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
240  p.memoryPhase1 = m1;
241  p.memoryPhase2 = m2;
242  p.memoryPhase3 = m3;
243  check_not_started();
244  }
245 
246 private:
247  // Checks if we should still be able to change parameters
248  inline void check_not_started() {
249  if (m_state != stNotStarted) {
250  throw tpie::exception("Can't change parameters after merge sorting has started");
251  }
252  }
253 
254 public:
255  inline void set_phase_1_files(memory_size_type f1) {
256  p.filesPhase1 = f1;
257  check_not_started();
258  }
259 
260  inline void set_phase_2_files(memory_size_type f2) {
261  p.filesPhase2 = f2;
262  check_not_started();
263  }
264 
265  inline void set_phase_3_files(memory_size_type f3) {
266  p.filesPhase3 = f3;
267  check_not_started();
268  }
269 
270  inline void set_phase_1_memory(memory_size_type m1) {
271  p.memoryPhase1 = m1;
272  check_not_started();
273  }
274 
275  inline void set_phase_2_memory(memory_size_type m2) {
276  p.memoryPhase2 = m2;
277  check_not_started();
278  }
279 
280  inline void set_phase_3_memory(memory_size_type m3) {
281  p.memoryPhase3 = m3;
282  check_not_started();
283  }
284 
288  inline void begin() {
289  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
290  if (!m_parametersSet) calculate_parameters();
291  log_debug() << "Start forming input runs" << std::endl;
292  m_currentRunItems = array<store_type>(0, allocator<store_type>(m_bucket));
293  m_currentRunItems.resize((size_t)p.runLength);
294  m_runFiles.resize(p.fanout*2);
295  m_currentRunItemCount = 0;
296  m_finishedRuns = 0;
297  m_state = stRunFormation;
298  m_itemCount = 0;
299  }
300 
304  inline void push(item_type && item) {
305  tp_assert(m_state == stRunFormation, "Wrong phase");
306  if (m_currentRunItemCount >= p.runLength) {
307  sort_current_run();
308  empty_current_run();
309  }
310  m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(std::move(item));
311  ++m_currentRunItemCount;
312  ++m_itemCount;
313  }
314 
315  inline void push(const item_type & item) {
316  tp_assert(m_state == stRunFormation, "Wrong phase");
317  if (m_currentRunItemCount >= p.runLength) {
318  sort_current_run();
319  empty_current_run();
320  }
321  m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(item);
322  ++m_currentRunItemCount;
323  ++m_itemCount;
324  }
325 
329  inline void end() {
330  tp_assert(m_state == stRunFormation, "Wrong phase");
331  sort_current_run();
332 
333  if (m_itemCount == 0) {
334  tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0");
335  m_reportInternal = true;
336  m_itemsPulled = 0;
337  m_currentRunItems.resize(0);
338  log_debug() << "Got no items. Internal reporting mode." << std::endl;
339  } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) {
340  // Our current buffer fits within the memory requirements of phase 2.
341  m_reportInternal = true;
342  m_itemsPulled = 0;
343  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode." << std::endl;
344 
345  } else if (m_finishedRuns == 0
346  && m_currentRunItemCount <= p.internalReportThreshold
347  && array<store_type>::memory_usage(m_currentRunItemCount) <= get_memory_manager().available()) {
348  // Our current buffer does not fit within the memory requirements
349  // of phase 2, but we have enough temporary memory to copy and
350  // resize the buffer.
351 
352  array<store_type> currentRun(m_currentRunItemCount);
353  for (size_t i=0; i < m_currentRunItemCount; ++i)
354  currentRun[i] = std::move(m_currentRunItems[i]);
355  m_currentRunItems.swap(currentRun);
356 
357  m_reportInternal = true;
358  m_itemsPulled = 0;
359  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode "
360  << "after resizing item buffer." << std::endl;
361 
362  } else {
363  m_reportInternal = false;
364  empty_current_run();
365  m_currentRunItems.resize(0);
366  log_debug() << "Got " << m_finishedRuns << " runs. External reporting mode." << std::endl;
367  }
368  m_state = stMerge;
369  }
370 
371  inline bool is_calc_free() const {
372  tp_assert(m_state == stMerge, "Wrong phase");
373  return m_reportInternal || m_finishedRuns <= p.fanout;
374  }
375 
380  inline void calc(typename Progress::base & pi) {
381  tp_assert(m_state == stMerge, "Wrong phase");
382  if (!m_reportInternal) {
383  prepare_pull(pi);
384  } else {
385  pi.init(1);
386  pi.step();
387  pi.done();
388  }
389  m_state = stReport;
390  }
391 
392  inline void evacuate() {
393  tp_assert(m_state == stMerge || m_state == stReport, "Wrong phase");
394  if (m_reportInternal) {
395  log_debug() << "Evacuate merge_sorter (" << this << ") in internal reporting mode" << std::endl;
396  m_reportInternal = false;
397  memory_size_type runCount = (m_currentRunItemCount > 0) ? 1 : 0;
398  empty_current_run();
399  m_currentRunItems.resize(0);
400  initialize_final_merger(0, runCount);
401  } else if (m_state == stMerge) {
402  log_debug() << "Evacuate merge_sorter (" << this << ") before merge in external reporting mode (noop)" << std::endl;
403  m_runPositions.evacuate();
404  return;
405  }
406  log_debug() << "Evacuate merge_sorter (" << this << ") before reporting in external reporting mode" << std::endl;
407  m_merger.reset();
408  m_evacuated = true;
409  m_runPositions.evacuate();
410  }
411 
412  inline void evacuate_before_merging() {
413  if (m_state == stMerge) evacuate();
414  }
415 
416  inline void evacuate_before_reporting() {
417  if (m_state == stReport && (!m_reportInternal || m_itemsPulled == 0)) evacuate();
418  }
419 
420 private:
422  // Phase 1 helpers.
424 
425  inline void sort_current_run() {
426  parallel_sort(m_currentRunItems.begin(), m_currentRunItems.begin()+m_currentRunItemCount,
427  bits::store_pred<pred_t, specific_store_t>(pred));
428  }
429 
430  // postcondition: m_currentRunItemCount = 0
431  inline void empty_current_run() {
432  if (m_finishedRuns < 10)
433  log_debug() << "Write " << m_currentRunItemCount << " items to run file " << m_finishedRuns << std::endl;
434  else if (m_finishedRuns == 10)
435  log_debug() << "..." << std::endl;
436  file_stream<element_type> fs;
437  open_run_file_write(fs, 0, m_finishedRuns);
438  for (memory_size_type i = 0; i < m_currentRunItemCount; ++i)
439  fs.write(m_store.store_to_element(std::move(m_currentRunItems[i])));
440  m_currentRunItemCount = 0;
441  ++m_finishedRuns;
442  }
443 
448  inline void initialize_merger(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount) {
449  // runCount is a memory_size_type since we must be able to have that
450  // many file_streams open at the same time.
451 
452  // Open files and seek to the first item in the run.
453  array<file_stream<element_type> > in(runCount);
454  for (memory_size_type i = 0; i < runCount; ++i) {
455  open_run_file_read(in[i], mergeLevel, runNumber+i);
456  }
457  stream_size_type runLength = calculate_run_length(p.runLength, p.fanout, mergeLevel);
458  // Pass file streams with correct stream offsets to the merger
459  m_merger.reset(in, runLength);
460  }
461 
465  inline void initialize_final_merger(memory_size_type finalMergeLevel, memory_size_type runCount) {
466  if (m_finalMergeInitialized) {
467  reinitialize_final_merger();
468  return;
469  }
470 
471  m_finalMergeInitialized = true;
472  m_finalMergeLevel = finalMergeLevel;
473  m_finalRunCount = runCount;
474  m_runPositions.next_level();
475  m_runPositions.final_level(p.fanout);
476  if (runCount > p.finalFanout) {
477  log_debug() << "Run count in final level (" << runCount << ") is greater than the final fanout (" << p.finalFanout << ")\n";
478 
479  memory_size_type i = p.finalFanout-1;
480  memory_size_type n = runCount-i;
481  log_debug() << "Merge " << n << " runs starting from #" << i << std::endl;
482  dummy_progress_indicator pi;
483  m_finalMergeSpecialRunNumber = merge_runs(finalMergeLevel, i, n, pi);
484  } else {
485  log_debug() << "Run count in final level (" << runCount << ") is less or equal to the final fanout (" << p.finalFanout << ")" << std::endl;
486  m_finalMergeSpecialRunNumber = std::numeric_limits<memory_size_type>::max();
487  }
488  reinitialize_final_merger();
489  }
490 
491 public:
492  inline void reinitialize_final_merger() {
493  tp_assert(m_finalMergeInitialized, "reinitialize_final_merger while !m_finalMergeInitialized");
494  m_runPositions.unevacuate();
495  if (m_finalMergeSpecialRunNumber != std::numeric_limits<memory_size_type>::max()) {
496  array<file_stream<element_type> > in(p.finalFanout);
497  for (memory_size_type i = 0; i < p.finalFanout-1; ++i) {
498  open_run_file_read(in[i], m_finalMergeLevel, i);
499  log_debug() << "Run " << i << " is at offset " << in[i].offset() << " and has size " << in[i].size() << std::endl;
500  }
501  open_run_file_read(in[p.finalFanout-1], m_finalMergeLevel+1, m_finalMergeSpecialRunNumber);
502  log_debug() << "Special large run is at offset " << in[p.finalFanout-1].offset() << " and has size " << in[p.finalFanout-1].size() << std::endl;
503  stream_size_type runLength = calculate_run_length(p.runLength, p.fanout, m_finalMergeLevel+1);
504  log_debug() << "Run length " << runLength << std::endl;
505  m_merger.reset(in, runLength);
506  } else {
507  initialize_merger(m_finalMergeLevel, 0, m_finalRunCount);
508  }
509  m_evacuated = false;
510  }
511 
512 private:
516  static inline stream_size_type calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel) {
517  stream_size_type runLength = initialRunLength;
518  for (memory_size_type i = 0; i < mergeLevel; ++i) {
519  runLength *= fanout;
520  }
521  return runLength;
522  }
523 
529  template <typename ProgressIndicator>
530  inline memory_size_type merge_runs(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount, ProgressIndicator & pi) {
531  initialize_merger(mergeLevel, runNumber, runCount);
532  file_stream<element_type> out;
533  memory_size_type nextRunNumber = runNumber/p.fanout;
534  open_run_file_write(out, mergeLevel+1, nextRunNumber);
535  while (m_merger.can_pull()) {
536  pi.step();
537  out.write(m_store.store_to_element(m_merger.pull()));
538  }
539  return nextRunNumber;
540  }
541 
545  inline void prepare_pull(typename Progress::base & pi) {
546  m_runPositions.unevacuate();
547 
548  // Compute merge depth (number of passes over data).
549  int treeHeight= static_cast<int>(ceil(log(static_cast<float>(m_finishedRuns)) /
550  log(static_cast<float>(p.fanout))));
551  pi.init(item_count()*treeHeight);
552 
553  memory_size_type mergeLevel = 0;
554  memory_size_type runCount = m_finishedRuns;
555  while (runCount > p.fanout) {
556  log_debug() << "Merge " << runCount << " runs in merge level " << mergeLevel << '\n';
557  m_runPositions.next_level();
558  memory_size_type newRunCount = 0;
559  for (memory_size_type i = 0; i < runCount; i += p.fanout) {
560  memory_size_type n = std::min(runCount-i, p.fanout);
561 
562  if (newRunCount < 10)
563  log_debug() << "Merge " << n << " runs starting from #" << i << std::endl;
564  else if (newRunCount == 10)
565  log_debug() << "..." << std::endl;
566 
567  merge_runs(mergeLevel, i, n, pi);
568  ++newRunCount;
569  }
570  ++mergeLevel;
571  runCount = newRunCount;
572  }
573  log_debug() << "Final merge level " << mergeLevel << " has " << runCount << " runs" << std::endl;
574  initialize_final_merger(mergeLevel, runCount);
575 
576  m_state = stReport;
577  pi.done();
578  }
579 
580 public:
585  inline bool can_pull() {
586  tp_assert(m_state == stReport, "Wrong phase");
587  if (m_reportInternal) return m_itemsPulled < m_currentRunItemCount;
588  else {
589  if (m_evacuated) reinitialize_final_merger();
590  return m_merger.can_pull();
591  }
592  }
593 
597  inline item_type pull() {
598  tp_assert(m_state == stReport, "Wrong phase");
599  if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
600  store_type el = std::move(m_currentRunItems[m_itemsPulled++]);
601  if (!can_pull()) m_currentRunItems.resize(0);
602  return m_store.store_to_outer(std::move(el));
603  } else {
604  if (m_evacuated) reinitialize_final_merger();
605  m_runPositions.close();
606  return m_store.store_to_outer(m_merger.pull());
607  }
608  }
609 
610  inline stream_size_type item_count() {
611  return m_itemCount;
612  }
613 
614  static memory_size_type memory_usage_phase_1(const sort_parameters & params) {
615  return params.runLength * item_size
617  + file_stream<element_type>::memory_usage()
618  + 2*params.fanout*sizeof(temp_file);
619  }
620 
621  static memory_size_type minimum_memory_phase_1() {
622  // Our *absolute minimum* memory requirements are a single item and
623  // twice as many temp_files as the fanout.
624  // However, our fanout calculation does not take the memory available
625  // in this phase (run formation) into account.
626  // Thus, we assume the largest fanout, meaning we might overshoot.
627  // If we do overshoot, we will just spend the extra bytes on a run length
628  // longer than 1, which is probably what the user wants anyway.
629  sort_parameters tmp_p((sort_parameters()));
630  tmp_p.runLength = 1;
631  tmp_p.fanout = calculate_fanout(std::numeric_limits<memory_size_type>::max(), 0);
632  return memory_usage_phase_1(tmp_p);
633  }
634 
635  static memory_size_type memory_usage_phase_2(const sort_parameters & params) {
636  return fanout_memory_usage(params.fanout);
637  }
638 
639  static memory_size_type minimum_memory_phase_2() {
640  return fanout_memory_usage(calculate_fanout(0, 0));
641  }
642 
643  static memory_size_type memory_usage_phase_3(const sort_parameters & params) {
644  return fanout_memory_usage(params.finalFanout);
645  }
646 
647  static memory_size_type minimum_memory_phase_3() {
648  return fanout_memory_usage(calculate_fanout(0, 0));
649  }
650 
651  static memory_size_type maximum_memory_phase_3() {
652  return std::numeric_limits<memory_size_type>::max();
653  }
654 
655  memory_size_type actual_memory_phase_3() {
656  tp_assert(m_state == stReport, "Wrong phase");
657  if (m_reportInternal)
658  return m_runFiles.memory_usage(m_runFiles.size())
659  + m_currentRunItems.memory_usage(m_currentRunItems.size());
660  else
661  return fanout_memory_usage(m_finalRunCount);
662  }
663 
664  inline memory_size_type evacuated_memory_usage() const {
665  return 2*p.fanout*sizeof(temp_file);
666  }
667 
668 private:
669  static memory_size_type clamp(memory_size_type lo, memory_size_type val, memory_size_type hi) {
670  return std::max(lo, std::min(val, hi));
671  }
672 
676  inline void calculate_parameters() {
677  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
678 
679  if(!p.filesPhase1)
680  p.filesPhase1 = clamp(minimumFilesPhase1, defaultFiles, maximumFilesPhase1);
681  if(!p.filesPhase2)
682  p.filesPhase2 = clamp(minimumFilesPhase2, defaultFiles, maximumFilesPhase2);
683  if(!p.filesPhase3)
684  p.filesPhase3 = clamp(minimumFilesPhase3, defaultFiles, maximumFilesPhase3);
685 
686  if(p.filesPhase1 < minimumFilesPhase1)
687  throw tpie::exception("file limit for phase 1 too small (" + std::to_string(p.filesPhase1) + " < " + std::to_string(minimumFilesPhase1) + ")");
688  if(p.filesPhase2 < minimumFilesPhase2)
689  throw tpie::exception("file limit for phase 2 too small (" + std::to_string(p.filesPhase2) + " < " + std::to_string(minimumFilesPhase2) + ")");
690  if(p.filesPhase3 < minimumFilesPhase3)
691  throw tpie::exception("file limit for phase 3 too small (" + std::to_string(p.filesPhase3) + " < " + std::to_string(minimumFilesPhase3) + ")");
692 
693  if (!p.filesPhase1)
694  throw tpie::exception("memory limit for phase 1 not set");
695  if (!p.filesPhase2)
696  throw tpie::exception("memory limit for phase 2 not set");
697  if (!p.filesPhase3)
698  throw tpie::exception("memory limit for phase 3 not set");
699 
700  // We must set aside memory for temp_files in m_runFiles.
701  // m_runFiles contains fanout*2 temp_files, so calculate fanout before run length.
702 
703  // Phase 2 (merge):
704  // Run length: unbounded
705  // Fanout: determined by the size of our merge heap and the stream memory usage.
706  log_debug() << "Phase 2: " << p.memoryPhase2 << " b available memory\n";
707  p.fanout = calculate_fanout(p.memoryPhase2, p.filesPhase2);
708  if (fanout_memory_usage(p.fanout) > p.memoryPhase2) {
709  log_debug() << "Not enough memory for fanout " << p.fanout << "! (" << p.memoryPhase2 << " < " << fanout_memory_usage(p.fanout) << ")\n";
710  p.memoryPhase2 = fanout_memory_usage(p.fanout);
711  }
712 
713  // Phase 3 (final merge & report):
714  // Run length: unbounded
715  // Fanout: determined by the stream memory usage.
716  log_debug() << "Phase 3: " << p.memoryPhase3 << " b available memory\n";
717  p.finalFanout = calculate_fanout(p.memoryPhase3, p.filesPhase3);
718 
719  if (p.finalFanout > p.fanout)
720  p.finalFanout = p.fanout;
721 
722  if (fanout_memory_usage(p.finalFanout) > p.memoryPhase3) {
723  log_debug() << "Not enough memory for fanout " << p.finalFanout << "! (" << p.memoryPhase3 << " < " << fanout_memory_usage(p.finalFanout) << ")\n";
724  p.memoryPhase3 = fanout_memory_usage(p.finalFanout);
725  }
726 
727  // Phase 1 (run formation):
728  // Run length: determined by the number of items we can hold in memory.
729  // Fanout: unbounded
730 
731  memory_size_type streamMemory = file_stream<element_type>::memory_usage();
732  memory_size_type tempFileMemory = 2*p.fanout*sizeof(temp_file);
733 
734  log_debug() << "Phase 1: " << p.memoryPhase1 << " b available memory; " << streamMemory << " b for a single stream; " << tempFileMemory << " b for temp_files\n";
735  memory_size_type min_m1 = 128*1024 / item_size + bits::run_positions::memory_usage() + streamMemory + tempFileMemory;
736  if (p.memoryPhase1 < min_m1) {
737  log_warning() << "Not enough phase 1 memory for 128 KB items and an open stream! (" << p.memoryPhase1 << " < " << min_m1 << ")\n";
738  p.memoryPhase1 = min_m1;
739  }
740  p.runLength = (p.memoryPhase1 - bits::run_positions::memory_usage() - streamMemory - tempFileMemory)/item_size;
741 
742  p.internalReportThreshold = (std::min(p.memoryPhase1,
743  std::min(p.memoryPhase2,
744  p.memoryPhase3))
745  - tempFileMemory)/item_size;
748 
749  m_parametersSet = true;
750 
751  set_items(m_maxItems);
752 
753  log_debug() << "Calculated merge sort parameters\n";
754  p.dump(log_debug());
755  log_debug() << std::endl;
756 
757  log_debug() << "Merge sort phase 1: "
758  << p.memoryPhase1 << " b available, " << memory_usage_phase_1(p) << " b expected" << std::endl;
759  if (memory_usage_phase_1(p) > p.memoryPhase1) {
760  log_warning() << "Merge sort phase 1 exceeds the alloted memory usage: "
761  << p.memoryPhase1 << " b available, but " << memory_usage_phase_1(p) << " b expected" << std::endl;
762  }
763  log_debug() << "Merge sort phase 2: "
764  << p.memoryPhase2 << " b available, " << memory_usage_phase_2(p) << " b expected" << std::endl;
765  if (memory_usage_phase_2(p) > p.memoryPhase2) {
766  log_warning() << "Merge sort phase 2 exceeds the alloted memory usage: "
767  << p.memoryPhase2 << " b available, but " << memory_usage_phase_2(p) << " b expected" << std::endl;
768  }
769  log_debug() << "Merge sort phase 3: "
770  << p.memoryPhase3 << " b available, " << memory_usage_phase_3(p) << " b expected" << std::endl;
771  if (memory_usage_phase_3(p) > p.memoryPhase3) {
772  log_warning() << "Merge sort phase 3 exceeds the alloted memory usage: "
773  << p.memoryPhase3 << " b available, but " << memory_usage_phase_3(p) << " b expected" << std::endl;
774  }
775  }
776 
780  static inline memory_size_type calculate_fanout(memory_size_type availableMemory, memory_size_type availableFiles) {
781  memory_size_type fanout_lo = 2;
782  memory_size_type fanout_hi = availableFiles - 2;
783  // binary search
784  while (fanout_lo < fanout_hi - 1) {
785  memory_size_type mid = fanout_lo + (fanout_hi-fanout_lo)/2;
786  if (fanout_memory_usage(mid) <= availableMemory) {
787  fanout_lo = mid;
788  } else {
789  fanout_hi = mid;
790  }
791  }
792  return fanout_lo;
793  }
794 
798  static inline memory_size_type fanout_memory_usage(memory_size_type fanout) {
799  return merger<specific_store_t, pred_t>::memory_usage(fanout) // accounts for the `fanout' open streams
801  + file_stream<element_type>::memory_usage() // output stream
802  + 2*sizeof(temp_file); // merge_sorter::m_runFiles
803  }
804 
805 public:
814  void set_items(stream_size_type n) {
815  if (m_state != stNotStarted)
816  throw exception("Wrong state in set_items: state is not stNotStarted");
817 
818  m_maxItems = n;
819 
820  if (!m_parametersSet) {
821  // We will handle this later in calculate_parameters
822  return;
823  }
824 
825  // If the item upper bound is less than a run,
826  // then it might pay off to decrease the length of a run
827  // so that we can avoid I/O altogether.
828  if (m_maxItems < p.runLength) {
829  memory_size_type newRunLength =
830  std::max(memory_size_type(m_maxItems), p.internalReportThreshold);
831  log_debug() << "Decreasing run length from " << p.runLength
832  << " to " << newRunLength
833  << " since at most " << m_maxItems << " items will be pushed,"
834  << " and the internal report threshold is "
836  << ". New merge sort parameters:\n";
837  // In principle, we could decrease runLength to m_maxItems,
838  // but setting runLength below internalReportThreshold does not
839  // give additional benefits.
840  // Furthermore, buggy code could call set_items with a very low
841  // upper bound, leading to unacceptable performance in practice;
842  // thus, internalReportThreshold is used as a stopgap/failsafe.
843  p.runLength = newRunLength;
844  p.dump(log_debug());
845  log_debug() << std::endl;
846  }
847  }
848 
849  void set_owner(tpie::pipelining::node * n) {
850  if (m_owning_node != nullptr)
851  m_bucketPtr = std::move(m_owning_node->bucket(0));
852 
853  if (n != nullptr)
854  n->bucket(0) = std::move(m_bucketPtr);
855 
856  m_owning_node = n;
857  }
858 private:
864  inline memory_size_type run_file_index(memory_size_type mergeLevel, memory_size_type runNumber) {
865  // runNumber is a memory_size_type since it is used as an index into
866  // m_runFiles.
867 
868  return (mergeLevel % 2)*p.fanout + (runNumber % p.fanout);
869  }
870 
874  void open_run_file_write(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
875  // see run_file_index comment about runNumber
876 
877  memory_size_type idx = run_file_index(mergeLevel, runNumber);
878  if (runNumber < p.fanout) m_runFiles[idx].free();
879  fs.open(m_runFiles[idx], access_read_write, 0, access_sequential, compression_normal);
880  fs.seek(0, file_stream_base::end);
881  m_runPositions.set_position(mergeLevel, runNumber, fs.get_position());
882  }
883 
887  void open_run_file_read(file_stream<element_type> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
888  // see run_file_index comment about runNumber
889 
890  memory_size_type idx = run_file_index(mergeLevel, runNumber);
891  fs.open(m_runFiles[idx], access_read, 0, access_sequential, compression_normal);
892  fs.set_position(m_runPositions.get_position(mergeLevel, runNumber));
893  }
894 
895  enum state_type {
896  stNotStarted,
897  stRunFormation,
898  stMerge,
899  stReport
900  };
901 
902 
903  std::unique_ptr<memory_bucket> m_bucketPtr;
904  memory_bucket_ref m_bucket;
905 
906  array<temp_file> m_runFiles;
907 
908  state_type m_state;
909 
910  sort_parameters p;
911  bool m_parametersSet;
912 
913  specific_store_t m_store;
914  merger<specific_store_t, pred_t> m_merger;
915 
916  bits::run_positions m_runPositions;
917 
918  // Number of runs already written to disk.
919  // On 32-bit systems, we could in principle support more than 2^32 finished runs,
920  // but keeping this as a memory_size_type is nicer when doing the actual merges.
921  stream_size_type m_finishedRuns;
922 
923  // current run buffer. size 0 before begin(), size runLength after begin().
924  array<store_type> m_currentRunItems;
925 
926  // Number of items in current run buffer.
927  // Used to index into m_currentRunItems, so memory_size_type.
928  memory_size_type m_currentRunItemCount;
929 
930  bool m_reportInternal;
931 
932  // When doing internal reporting: the number of items already reported
933  // Used in comparison with m_currentRunItemCount
934  memory_size_type m_itemsPulled;
935 
936  stream_size_type m_itemCount;
937 
938  stream_size_type m_maxItems;
939 
940  pred_t pred;
941  bool m_evacuated;
942  bool m_finalMergeInitialized;
943  memory_size_type m_finalMergeLevel;
944  memory_size_type m_finalRunCount;
945  memory_size_type m_finalMergeSpecialRunNumber;
946 
947  tpie::pipelining::node * m_owning_node;
948 };
949 
950 } // namespace tpie
951 
952 #endif // __TPIE_PIPELINING_MERGE_SORTER_H__
Sequential access is intended.
Definition: cache_hint.h:36
Encapsulation of two pointers from any random access container.
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
void unevacuate()
Switch from any state to the corresponding non-evacuated state.
void set_available_memory(memory_size_type m)
Calculate parameters from given memory amount.
Definition: merge_sorter.h:228
Class to maintain the positions where sorted runs start.
Definition: merge_sorter.h:61
item_type pull()
In phase 3, fetch next item in the final merge phase.
Definition: merge_sorter.h:597
The base class for indicating the progress of some task.
memory_size_type filesPhase3
files available during output phase.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
A generic array with a fixed size.
Definition: array.h:144
void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3)
Calculate parameters from given memory amount.
Definition: merge_sorter.h:239
static memory_size_type memory_usage(memory_size_type size)
Return the number of bytes required to create a data structure supporting a given number of elements...
Definition: util.h:81
void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3)
Calculate parameters from given amount of files.
Definition: merge_sorter.h:217
Merge sorting consists of three phases.
Definition: merge_sorter.h:150
virtual void done()
Advance the indicator to the end.
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
Open a file for reading.
Definition: access_type.h:31
Compressed stream public API.
static memory_size_type memory_usage()
Memory usage when open and not evacuated.
Base class of all nodes.
Definition: node.h:78
memory_size_type memoryPhase3
Memory available during output phase.
POD object indicating the position of an item in a stream.
memory_size_type filesPhase2
files available while merging runs.
Simple parallel quick sort implementation with progress tracking.
A allocator object usable in STL containers, using the TPIE memory manager.
Definition: memory.h:390
Class storring a reference to a memory bucket.
Definition: memory.h:366
void set_items(stream_size_type n)
Set upper bound on number of items pushed.
Definition: merge_sorter.h:814
memory_manager & get_memory_manager()
Return a reference to the memory manager.
memory_size_type finalFanout
Fanout of merge tree during phase 3.
void final_level(memory_size_type fanout)
Set this to be the final level in the merge heap - see class docstring.
Class representing a reference to a temporary file.
Definition: tempname.h:202
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
Definition: merge_sorter.h:585
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Bucket used for memory counting.
Definition: memory.h:352
void set_position(memory_size_type mergeLevel, memory_size_type runNumber, stream_position pos)
Store a stream position - see class docstring.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
void end()
End phase 1.
Definition: merge_sorter.h:329
memory_size_type memoryPhase2
Memory available while merging runs.
void next_level()
Go to next level in the merge heap - see class docstring.
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
void close()
Switch from any state to closed state.
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
Definition: node.h:779
void open()
Switch from closed to open state.
Compressed stream.
Definition: predeclare.h:46
void swap(array &other)
Swap two arrays.
Definition: array.h:499
Compress some blocks according to available resources (time, memory).
Definition: scheme.h:40
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:307
size_type size() const
Return the size of the array.
Definition: array.h:526
void evacuate()
Switch from any state to the corresponding evacuated state.
void begin()
Initiate phase 1: Formation of input runs.
Definition: merge_sorter.h:288
void push(item_type &&item)
Push item to merge sorter during phase 1.
Definition: merge_sorter.h:304
void set_parameters(memory_size_type runLength, memory_size_type fanout)
Enable setting run length and fanout manually (for testing purposes).
Definition: merge_sorter.h:192
memory_size_type filesPhase1
files available while forming sorted runs.
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
void calc(typename Progress::base &pi)
Perform phase 2: Performing all merges in the merge tree except the last one.
Definition: merge_sorter.h:380
For applications where you wish to disable progress indicators via a template parameter, refer to progress_types members names sub, fp and base.
stream_position get_position(memory_size_type mergeLevel, memory_size_type runNumber)
Fetch a stream position - see class docstring.
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:157
void set_available_files(memory_size_type f)
Calculate parameters from given amount of files.
Definition: merge_sorter.h:206
memory_size_type fanout
Fanout of merge tree during phase 2.
Progress indicator concept in an efficient non-inheritance way.
Open a file for reading or writing.
Definition: access_type.h:35
virtual void init(stream_size_type range=0)
Initialize progress indicator.
memory_size_type memoryPhase1
memory available while forming sorted runs.