TPIE

2362a60
tpie::merge_sorter< T, UseProgress, pred_t, store_t > Class Template Reference

Merge sorting consists of three phases. More...

#include <tpie/pipelining/merge_sorter.h>

Public Types

typedef std::shared_ptr
< merge_sorter
ptr
 
typedef progress_types
< UseProgress > 
Progress
 

Public Member Functions

 merge_sorter (pred_t pred=pred_t(), store_t store=store_t())
 
void set_parameters (memory_size_type runLength, memory_size_type fanout)
 Enable setting run length and fanout manually (for testing purposes). More...
 
void set_available_files (memory_size_type f)
 Calculate parameters from given amount of files. More...
 
void set_available_files (memory_size_type f1, memory_size_type f2, memory_size_type f3)
 Calculate parameters from given amount of files. More...
 
void set_available_memory (memory_size_type m)
 Calculate parameters from given memory amount. More...
 
void set_available_memory (memory_size_type m1, memory_size_type m2, memory_size_type m3)
 Calculate parameters from given memory amount. More...
 
void set_phase_1_files (memory_size_type f1)
 
void set_phase_2_files (memory_size_type f2)
 
void set_phase_3_files (memory_size_type f3)
 
void set_phase_1_memory (memory_size_type m1)
 
void set_phase_2_memory (memory_size_type m2)
 
void set_phase_3_memory (memory_size_type m3)
 
void begin ()
 Initiate phase 1: Formation of input runs. More...
 
void push (item_type &&item)
 Push item to merge sorter during phase 1. More...
 
void push (const item_type &item)
 
void end ()
 End phase 1. More...
 
bool is_calc_free () const
 
void calc (typename Progress::base &pi)
 Perform phase 2: Performing all merges in the merge tree except the last one. More...
 
void evacuate ()
 
void evacuate_before_merging ()
 
void evacuate_before_reporting ()
 
void reinitialize_final_merger ()
 
bool can_pull ()
 In phase 3, return true if there are more items in the final merge phase. More...
 
item_type pull ()
 In phase 3, fetch next item in the final merge phase. More...
 
stream_size_type item_count ()
 
memory_size_type actual_memory_phase_3 ()
 
memory_size_type evacuated_memory_usage () const
 
void set_items (stream_size_type n)
 Set upper bound on number of items pushed. More...
 
void set_owner (tpie::pipelining::node *n)
 

Static Public Member Functions

static memory_size_type memory_usage_phase_1 (const sort_parameters &params)
 
static memory_size_type minimum_memory_phase_1 ()
 
static memory_size_type memory_usage_phase_2 (const sort_parameters &params)
 
static memory_size_type minimum_memory_phase_2 ()
 
static memory_size_type memory_usage_phase_3 (const sort_parameters &params)
 
static memory_size_type minimum_memory_phase_3 ()
 
static memory_size_type maximum_memory_phase_3 ()
 

Static Public Attributes

static const memory_size_type defaultFiles = 253
 
static const memory_size_type minimumFilesPhase1 = 1
 
static const memory_size_type maximumFilesPhase1 = 1
 
static const memory_size_type minimumFilesPhase2 = 5
 
static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max()
 
static const memory_size_type minimumFilesPhase3 = 5
 
static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max()
 

Detailed Description

template<typename T, bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
class tpie::merge_sorter< T, UseProgress, pred_t, store_t >

Merge sorting consists of three phases.

  1. Sorting and forming runs
  2. Merging runs
  3. Final merge and report

If the number of elements received during phase 1 is less than the length of a single run, we are in "report internal" mode, meaning we do not write anything to disk. This causes phase 2 to be a no-op and phase 3 to be a simple array traversal.

Definition at line 150 of file merge_sorter.h.

Member Function Documentation

template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::begin ( )
inline

Initiate phase 1: Formation of input runs.

Definition at line 288 of file merge_sorter.h.

References tpie::sort_parameters::fanout, tpie::log_debug(), tpie::array< T, Allocator >::resize(), tpie::sort_parameters::runLength, and tp_assert.

288  {
289  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
290  if (!m_parametersSet) calculate_parameters();
291  log_debug() << "Start forming input runs" << std::endl;
292  m_currentRunItems = array<store_type>(0, allocator<store_type>(m_bucket));
293  m_currentRunItems.resize((size_t)p.runLength);
294  m_runFiles.resize(p.fanout*2);
295  m_currentRunItemCount = 0;
296  m_finishedRuns = 0;
297  m_state = stRunFormation;
298  m_itemCount = 0;
299  }
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
memory_size_type fanout
Fanout of merge tree during phase 2.
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::calc ( typename Progress::base pi)
inline

Perform phase 2: Performing all merges in the merge tree except the last one.

Definition at line 380 of file merge_sorter.h.

References tpie::progress_indicator_base::done(), tpie::progress_indicator_base::init(), tpie::progress_indicator_base::step(), and tp_assert.

380  {
381  tp_assert(m_state == stMerge, "Wrong phase");
382  if (!m_reportInternal) {
383  prepare_pull(pi);
384  } else {
385  pi.init(1);
386  pi.step();
387  pi.done();
388  }
389  m_state = stReport;
390  }
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
bool tpie::merge_sorter< T, UseProgress, pred_t, store_t >::can_pull ( )
inline

In phase 3, return true if there are more items in the final merge phase.

Definition at line 585 of file merge_sorter.h.

References tp_assert.

Referenced by tpie::merge_sorter< T, UseProgress, pred_t, store_t >::pull().

585  {
586  tp_assert(m_state == stReport, "Wrong phase");
587  if (m_reportInternal) return m_itemsPulled < m_currentRunItemCount;
588  else {
589  if (m_evacuated) reinitialize_final_merger();
590  return m_merger.can_pull();
591  }
592  }
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::end ( )
inline

End phase 1.

Definition at line 329 of file merge_sorter.h.

References tpie::get_memory_manager(), tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), tpie::array< T, Allocator >::resize(), tpie::array< T, Allocator >::size(), tpie::array< T, Allocator >::swap(), and tp_assert.

329  {
330  tp_assert(m_state == stRunFormation, "Wrong phase");
331  sort_current_run();
332 
333  if (m_itemCount == 0) {
334  tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0");
335  m_reportInternal = true;
336  m_itemsPulled = 0;
337  m_currentRunItems.resize(0);
338  log_debug() << "Got no items. Internal reporting mode." << std::endl;
339  } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) {
340  // Our current buffer fits within the memory requirements of phase 2.
341  m_reportInternal = true;
342  m_itemsPulled = 0;
343  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode." << std::endl;
344 
345  } else if (m_finishedRuns == 0
346  && m_currentRunItemCount <= p.internalReportThreshold
347  && array<store_type>::memory_usage(m_currentRunItemCount) <= get_memory_manager().available()) {
348  // Our current buffer does not fit within the memory requirements
349  // of phase 2, but we have enough temporary memory to copy and
350  // resize the buffer.
351 
352  array<store_type> currentRun(m_currentRunItemCount);
353  for (size_t i=0; i < m_currentRunItemCount; ++i)
354  currentRun[i] = std::move(m_currentRunItems[i]);
355  m_currentRunItems.swap(currentRun);
356 
357  m_reportInternal = true;
358  m_itemsPulled = 0;
359  log_debug() << "Got " << m_currentRunItemCount << " items. Internal reporting mode "
360  << "after resizing item buffer." << std::endl;
361 
362  } else {
363  m_reportInternal = false;
364  empty_current_run();
365  m_currentRunItems.resize(0);
366  log_debug() << "Got " << m_finishedRuns << " runs. External reporting mode." << std::endl;
367  }
368  m_state = stMerge;
369  }
static memory_size_type memory_usage(memory_size_type size)
Return the number of bytes required to create a data structure supporting a given number of elements...
Definition: util.h:81
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
void swap(array &other)
Swap two arrays.
Definition: array.h:499
size_type size() const
Return the size of the array.
Definition: array.h:526
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
item_type tpie::merge_sorter< T, UseProgress, pred_t, store_t >::pull ( )
inline

In phase 3, fetch next item in the final merge phase.

Definition at line 597 of file merge_sorter.h.

References tpie::merge_sorter< T, UseProgress, pred_t, store_t >::can_pull(), tpie::bits::run_positions::close(), tpie::array< T, Allocator >::resize(), and tp_assert.

597  {
598  tp_assert(m_state == stReport, "Wrong phase");
599  if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
600  store_type el = std::move(m_currentRunItems[m_itemsPulled++]);
601  if (!can_pull()) m_currentRunItems.resize(0);
602  return m_store.store_to_outer(std::move(el));
603  } else {
604  if (m_evacuated) reinitialize_final_merger();
605  m_runPositions.close();
606  return m_store.store_to_outer(m_merger.pull());
607  }
608  }
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
Definition: merge_sorter.h:585
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
void close()
Switch from any state to closed state.
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::push ( item_type &&  item)
inline

Push item to merge sorter during phase 1.

Definition at line 304 of file merge_sorter.h.

References tpie::sort_parameters::runLength, and tp_assert.

304  {
305  tp_assert(m_state == stRunFormation, "Wrong phase");
306  if (m_currentRunItemCount >= p.runLength) {
307  sort_current_run();
308  empty_current_run();
309  }
310  m_currentRunItems[m_currentRunItemCount] = m_store.outer_to_store(std::move(item));
311  ++m_currentRunItemCount;
312  ++m_itemCount;
313  }
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::set_available_files ( memory_size_type  f)
inline

Calculate parameters from given amount of files.

Parameters
fFiles available for phase 1, 2 and 3

Definition at line 206 of file merge_sorter.h.

References tpie::sort_parameters::filesPhase1, tpie::sort_parameters::filesPhase2, and tpie::sort_parameters::filesPhase3.

206  {
207  p.filesPhase1 = p.filesPhase2 = p.filesPhase3 = f;
208  check_not_started();
209  }
memory_size_type filesPhase3
files available during output phase.
memory_size_type filesPhase2
files available while merging runs.
memory_size_type filesPhase1
files available while forming sorted runs.
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::set_available_files ( memory_size_type  f1,
memory_size_type  f2,
memory_size_type  f3 
)
inline

Calculate parameters from given amount of files.

Parameters
f1Files available for phase 1
f2Files available for phase 2
f3Files available for phase 3

Definition at line 217 of file merge_sorter.h.

References tpie::sort_parameters::filesPhase1, tpie::sort_parameters::filesPhase2, and tpie::sort_parameters::filesPhase3.

217  {
218  p.filesPhase1 = f1;
219  p.filesPhase2 = f2;
220  p.filesPhase3 = f3;
221  check_not_started();
222  }
memory_size_type filesPhase3
files available during output phase.
memory_size_type filesPhase2
files available while merging runs.
memory_size_type filesPhase1
files available while forming sorted runs.
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::set_available_memory ( memory_size_type  m)
inline

Calculate parameters from given memory amount.

Parameters
mMemory available for phase 1, 2 and 3

Definition at line 228 of file merge_sorter.h.

References tpie::sort_parameters::memoryPhase1, tpie::sort_parameters::memoryPhase2, and tpie::sort_parameters::memoryPhase3.

228  {
229  p.memoryPhase1 = p.memoryPhase2 = p.memoryPhase3 = m;
230  check_not_started();
231  }
memory_size_type memoryPhase3
Memory available during output phase.
memory_size_type memoryPhase2
Memory available while merging runs.
memory_size_type memoryPhase1
memory available while forming sorted runs.
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::set_available_memory ( memory_size_type  m1,
memory_size_type  m2,
memory_size_type  m3 
)
inline

Calculate parameters from given memory amount.

Parameters
m1Memory available for phase 1
m2Memory available for phase 2
m3Memory available for phase 3

Definition at line 239 of file merge_sorter.h.

References tpie::sort_parameters::memoryPhase1, tpie::sort_parameters::memoryPhase2, and tpie::sort_parameters::memoryPhase3.

239  {
240  p.memoryPhase1 = m1;
241  p.memoryPhase2 = m2;
242  p.memoryPhase3 = m3;
243  check_not_started();
244  }
memory_size_type memoryPhase3
Memory available during output phase.
memory_size_type memoryPhase2
Memory available while merging runs.
memory_size_type memoryPhase1
memory available while forming sorted runs.
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::set_items ( stream_size_type  n)
inline

Set upper bound on number of items pushed.

If the number of items to push is less than the size of a single run, this method will decrease the run size to that. This may make it easier for the sorter to go into internal reporting mode.

Definition at line 814 of file merge_sorter.h.

References tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), and tpie::sort_parameters::runLength.

814  {
815  if (m_state != stNotStarted)
816  throw exception("Wrong state in set_items: state is not stNotStarted");
817 
818  m_maxItems = n;
819 
820  if (!m_parametersSet) {
821  // We will handle this later in calculate_parameters
822  return;
823  }
824 
825  // If the item upper bound is less than a run,
826  // then it might pay off to decrease the length of a run
827  // so that we can avoid I/O altogether.
828  if (m_maxItems < p.runLength) {
829  memory_size_type newRunLength =
830  std::max(memory_size_type(m_maxItems), p.internalReportThreshold);
831  log_debug() << "Decreasing run length from " << p.runLength
832  << " to " << newRunLength
833  << " since at most " << m_maxItems << " items will be pushed,"
834  << " and the internal report threshold is "
836  << ". New merge sort parameters:\n";
837  // In principle, we could decrease runLength to m_maxItems,
838  // but setting runLength below internalReportThreshold does not
839  // give additional benefits.
840  // Furthermore, buggy code could call set_items with a very low
841  // upper bound, leading to unacceptable performance in practice;
842  // thus, internalReportThreshold is used as a stopgap/failsafe.
843  p.runLength = newRunLength;
844  p.dump(log_debug());
845  log_debug() << std::endl;
846  }
847  }
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
template<typename T , bool UseProgress, typename pred_t = std::less<T>, typename store_t = default_store>
void tpie::merge_sorter< T, UseProgress, pred_t, store_t >::set_parameters ( memory_size_type  runLength,
memory_size_type  fanout 
)
inline

Enable setting run length and fanout manually (for testing purposes).

Definition at line 192 of file merge_sorter.h.

References tpie::sort_parameters::fanout, tpie::sort_parameters::finalFanout, tpie::sort_parameters::internalReportThreshold, tpie::log_debug(), tpie::sort_parameters::runLength, and tp_assert.

192  {
193  tp_assert(m_state == stNotStarted, "Merge sorting already begun");
194  p.runLength = p.internalReportThreshold = runLength;
195  p.fanout = p.finalFanout = fanout;
196  m_parametersSet = true;
197  log_debug() << "Manually set merge sort run length and fanout\n";
198  log_debug() << "Run length = " << p.runLength << " (uses memory " << (p.runLength*item_size + file_stream<element_type>::memory_usage()) << ")\n";
199  log_debug() << "Fanout = " << p.fanout << " (uses memory " << fanout_memory_usage(p.fanout) << ")" << std::endl;
200  }
memory_size_type runLength
Run length, subject to memory restrictions during phase 2.
memory_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
memory_size_type finalFanout
Fanout of merge tree during phase 3.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
memory_size_type fanout
Fanout of merge tree during phase 2.

The documentation for this class was generated from the following file: