TPIE

2362a60
node.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_NODE_H__
21 #define __TPIE_PIPELINING_NODE_H__
22 
23 #include <tpie/pipelining/exception.h>
24 #include <tpie/pipelining/tokens.h>
27 #include <tpie/pipelining/priority_type.h>
29 #include <tpie/pipelining/node_name.h>
30 #include <tpie/pipelining/node_traits.h>
31 #include <tpie/flags.h>
32 #include <limits>
33 #include <tpie/resources.h>
34 
35 namespace tpie {
36 
37 namespace pipelining {
38 
39 namespace bits {
40 
42  node & m_node;
43 
44 public:
46 
47  virtual void refresh() override;
48 };
49 
50 } // namespace bits
51 
53  memory_size_type minimum = 0;
54  memory_size_type maximum = std::numeric_limits<memory_size_type>::max();
55  double fraction = 0.0;
56 
57  memory_size_type available = 0;
58 };
59 
61  node_resource_parameters resource_parameters[resource_type::TOTAL_RESOURCE_TYPES];
62 
63  std::string name;
64  priority_type namePriority = PRIORITY_NO_NAME;
65 
66  std::string phaseName;
67  priority_type phaseNamePriority = PRIORITY_NO_NAME;
68 
69  stream_size_type stepsTotal = 0;
70 };
71 
78 class node {
79 public:
80  typedef boost::optional<any_noncopyable &> maybeany_t;
81 
85  enum PLOT {
86  PLOT_SIMPLIFIED_HIDE=1,
87  PLOT_BUFFERED=2,
88  PLOT_PARALLEL=4
89  };
90 
94  enum STATE {
95  STATE_FRESH,
96  STATE_IN_PREPARE,
97  STATE_AFTER_PREPARE,
98  STATE_IN_PROPAGATE,
99  STATE_AFTER_PROPAGATE,
100  STATE_IN_BEGIN,
101  STATE_AFTER_BEGIN,
102  STATE_IN_GO,
103  STATE_IN_END,
104  STATE_AFTER_END
105  };
106 
110  virtual ~node() {}
111 
116  inline memory_size_type get_minimum_resource_usage(resource_type type) const {
117  return m_parameters.resource_parameters[type].minimum;
118  }
119 
124  inline memory_size_type get_maximum_resource_usage(resource_type type) const {
125  return m_parameters.resource_parameters[type].maximum;
126  }
127 
131  inline double get_resource_fraction(resource_type type) const {
132  return m_parameters.resource_parameters[type].fraction;
133  }
134 
138  inline memory_size_type get_available_of_resource(resource_type type) const {
139  return m_parameters.resource_parameters[type].available;
140  }
141 
145  void set_minimum_resource_usage(resource_type type, memory_size_type usage);
146 
153  void set_maximum_resource_usage(resource_type type, memory_size_type usage);
154 
160  void set_resource_fraction(resource_type type, double f);
161 
166  virtual void resource_available_changed(resource_type, memory_size_type) {
167  }
168 
172  void _internal_set_available_of_resource(resource_type type, memory_size_type available);
173 
178  inline memory_size_type get_minimum_memory() const {
179  return get_minimum_resource_usage(MEMORY);
180  }
181 
186  inline memory_size_type get_maximum_memory() const {
187  return get_maximum_resource_usage(MEMORY);
188  }
189 
193  inline double get_memory_fraction() const {
194  return get_resource_fraction(MEMORY);
195  }
196 
200  inline memory_size_type get_available_memory() const {
201  return get_available_of_resource(MEMORY);
202  }
203 
207  void set_minimum_memory(memory_size_type minimumMemory) {
208  set_minimum_resource_usage(MEMORY, minimumMemory);
209  }
210 
217  void set_maximum_memory(memory_size_type maximumMemory) {
218  set_maximum_resource_usage(MEMORY, maximumMemory);
219  }
220 
225  void set_memory_fraction(double f) {
226  set_resource_fraction(MEMORY, f);
227  }
228 
233  virtual void set_available_memory(memory_size_type availableMemory) {
234  unused(availableMemory);
235  }
236 
240  inline memory_size_type get_used_memory() const {
241  memory_size_type ans=0;
242  for (const auto & p: m_buckets)
243  if (p) ans += p->count;
244  return ans;
245  }
246 
251  inline bits::node_map::ptr get_node_map() const {
252  return token.get_map();
253  }
254 
259  inline node_token::id_t get_id() const {
260  return token.id();
261  }
262 
269  virtual void prepare() {
270  }
271 
283  virtual void propagate() {
284  }
285 
299  virtual void begin() {
300  }
301 
302  virtual bool is_go_free() const {return false;}
303 
309  virtual void go() {
310  log_warning() << "node subclass " << typeid(*this).name()
311  << " is not an initiator node" << std::endl;
312  throw not_initiator_node();
313  }
314 
329  virtual void end() {
330  }
331 
335  virtual bool can_evacuate() {
336  return false;
337  }
338 
342  virtual void evacuate() {
343  }
344 
349  inline priority_type get_name_priority() {
350  return m_parameters.namePriority;
351  }
352 
357  const std::string & get_name();
358 
363  void set_name(const std::string & name, priority_type priority = PRIORITY_USER);
364 
369  inline priority_type get_phase_name_priority() {
370  return m_parameters.phaseNamePriority;
371  }
372 
377  const std::string & get_phase_name();
378 
383  void set_phase_name(const std::string & name, priority_type priority = PRIORITY_USER);
384 
385 
389  inline void set_breadcrumb(const std::string & breadcrumb) {
390  m_parameters.name = m_parameters.name.empty() ? breadcrumb : (breadcrumb + " | " + m_parameters.name);
391  }
392 
397  inline stream_size_type get_steps() {
398  return m_parameters.stepsTotal;
399  }
400 
401  stream_size_type get_steps_left() {
402  return m_stepsLeft;
403  }
404 
409  m_pi = pi;
410  }
411 
416  return m_pi;
417  }
418 
422  STATE get_state() const {
423  return m_state;
424  }
425 
429  void set_state(STATE s) {
430  m_state = s;
431  }
432 
436  resource_type get_resource_being_assigned() const {
437  return m_resourceBeingAssigned;
438  }
439 
443  void set_resource_being_assigned(resource_type type) {
444  m_resourceBeingAssigned = type;
445  }
446 
451  flags<PLOT> get_plot_options() const {
452  return m_plotOptions;
453  }
454 
459  void set_plot_options(flags<PLOT> options) {
460  m_plotOptions = options;
461  }
462 protected:
463 #ifdef _WIN32
464  // Disable warning C4355: 'this' : used in base member initializer list
465  // node_token does not access members of the `node *`,
466  // it merely uses it as a value in the node map.
467  // Only after this node object is completely constructed are node members accessed.
468 #pragma warning( push )
469 #pragma warning( disable : 4355 )
470 #endif // _WIN32
471  node();
475 
479  node(const node & other) = delete;
480  node & operator=(const node & other) = delete;
481 
486  node(node && other);
487  node & operator=(node && other);
488 
492  node(const node_token & token);
493 #ifdef _WIN32
494 #pragma warning( pop )
495 #endif // _WIN32
496 public:
500  void add_push_destination(const node_token & dest);
501 
505  void add_push_destination(const node & dest);
506 
510  void add_pull_source(const node_token & dest);
511 
515  void add_pull_source(const node & dest);
516 
522  void add_dependency(const node_token & dest);
523 
529  void add_dependency(const node & dest);
530 
537  void add_memory_share_dependency(const node_token & dest);
538 
545  void add_memory_share_dependency(const node & dest);
546 
557  // Implementation note: If the type of the `value` parameter is changed
558  // from `T` to `const T &`, this will yield linker errors if an application
559  // attempts to pass a const reference to a static data member inside a
560  // templated class.
561  // See http://stackoverflow.com/a/5392050
563  template <typename T>
564  void forward(std::string key, T value, memory_size_type k = std::numeric_limits<memory_size_type>::max()) {
565  forward_any(key, any_noncopyable(std::move(value)), k);
566  }
567 
571  void forward_any(std::string key, any_noncopyable value, memory_size_type k = std::numeric_limits<memory_size_type>::max());
572 
573 private:
577  void add_forwarded_data(std::string key, node_token::id_t from_node);
578 
582  maybeany_t get_forwarded_data_maybe(std::string key);
583 
584 public:
589  bool can_fetch(std::string key) {
590  return bool(fetch_maybe(key));
591  }
592 
598  maybeany_t fetch_maybe(std::string key);
599 
604  any_noncopyable & fetch_any(std::string key);
605 
609  template <typename T>
610  inline T & fetch(std::string key) {
611  any_noncopyable &item = fetch_any(key);
612  try {
613  return any_cast<T>(item);
614  } catch (bad_any_noncopyable_cast m) {
615  std::stringstream ss;
616  ss << "Trying to fetch key '" << key << "' of type "
617  << typeid(T).name() << " but forwarded data was of type "
618  << item.type().name() << ". Message was: " << m.what();
619  throw invalid_argument_exception(ss.str());
620  }
621  }
622 
626  void no_forward_through();
627 
632  const node_token & get_token() const {
633  return token;
634  }
635 
636 public:
641  void set_steps(stream_size_type steps);
642 
643 
644 private:
648  void step_overflow();
649 public:
654  void step(stream_size_type steps = 1) {
655  assert(get_state() == STATE_IN_END ||
656  get_state() == STATE_IN_BEGIN ||
657  get_state() == STATE_AFTER_BEGIN ||
658  get_state() == STATE_IN_END ||
659  get_state() == STATE_IN_GO);
660  if (m_stepsLeft < steps)
661  step_overflow();
662  else
663  m_stepsLeft -= steps;
664  m_pi->step(steps);
665  }
666 
675 
676 #ifdef DOXYGEN
677  inline bool can_pull() const;
682 
686  inline item_type pull();
687 
691  inline void push(const item_type & item);
692 #endif
693 
700  void register_datastructure_usage(const std::string & name, double priority=1);
701 
708  void set_datastructure_memory_limits(const std::string & name, memory_size_type min, memory_size_type max=std::numeric_limits<memory_size_type>::max());
709 
714  memory_size_type get_datastructure_memory(const std::string & name);
715 
721  template<typename T>
722  void set_datastructure(const std::string & name, T datastructure) {
723  bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
724  bits::node_map::datastructuremap_t::iterator i = structures.find(name);
725 
726  if(i == structures.end())
727  throw tpie::exception("attempted to set non-registered datastructure");
728 
729  i->second.second = move_if_movable<T>(datastructure);
730  }
731 
737  template<typename T>
738  T & get_datastructure(const std::string & name) {
739  bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
740  bits::node_map::datastructuremap_t::iterator i = structures.find(name);
741 
742  if(i == structures.end())
743  throw tpie::exception("attempted to get non-registered datastructure");
744 
745  return any_cast<T>(i->second.second);
746  }
747 
748  void unset_datastructure(const std::string & name) {
749  bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
750  bits::node_map::datastructuremap_t::iterator i = structures.find(name);
751 
752  if(i == structures.end()) return;
753  i->second.second.reset();
754  }
755 
756 private:
757  struct datastructure_info_t {
758  datastructure_info_t() : min(0), max(std::numeric_limits<memory_size_type>::max()) {}
759  memory_size_type min;
760  memory_size_type max;
761  double priority;
762  };
763 
764  typedef std::map<std::string, datastructure_info_t> datastructuremap_t;
765 
766  const datastructuremap_t & get_datastructures() const {
767  return m_datastructures;
768  }
769 
770 public:
774  size_t buckets() const {return m_buckets.size();}
775 
779  std::unique_ptr<memory_bucket> & bucket(size_t i) {
780  if (m_buckets.size() <= i) m_buckets.resize(i+1);
781  if (!m_buckets[i]) m_buckets[i].reset(new memory_bucket());
782  return m_buckets[i];
783  }
784 
789  return tpie::memory_bucket_ref(bucket(i).get());
790  }
791 
792  friend class bits::memory_runtime;
793 
794  friend class bits::datastructure_runtime;
795 
796  friend class factory_base;
797 
798  friend class bits::pipeline_base;
799 private:
800  node_token token;
801 
802  node_parameters m_parameters;
803  std::vector<std::unique_ptr<memory_bucket> > m_buckets;
804 
805  std::map<std::string, any_noncopyable> m_forwardedFromHere;
806  std::map<std::string, node_token::id_t> m_forwardedToHere;
807 
808  datastructuremap_t m_datastructures;
809  stream_size_type m_stepsLeft;
811  STATE m_state;
812  resource_type m_resourceBeingAssigned = NO_RESOURCE;
813  std::unique_ptr<progress_indicator_base> m_piProxy;
814  flags<PLOT> m_plotOptions;
815 
816  friend class bits::proxy_progress_indicator;
817 };
818 
819 
820 TPIE_DECLARE_OPERATORS_FOR_FLAGS(node::PLOT);
821 
822 
823 } // namespace pipelining
824 
825 } // namespace tpie
826 
827 #endif // __TPIE_PIPELINING_NODE_H__
T & fetch(std::string key)
Fetch piece of auxiliary data, expecting a given value type.
Definition: node.h:610
memory_size_type get_available_memory() const
Get the amount of memory assigned to this node.
Definition: node.h:200
bool can_pull() const
For pull nodes, return true if there are more items to be pulled.
void set_resource_being_assigned(resource_type type)
Used internally to check order of method calls.
Definition: node.h:443
maybeany_t fetch_maybe(std::string key)
Fetch piece of auxiliary data as any_noncopyable (the internal representation) wrapped in a boost::op...
memory_size_type get_minimum_resource_usage(resource_type type) const
Get the minimum amount of the resource declared by this node.
Definition: node.h:116
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:299
Defines all types of managed resources.
void set_phase_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's phase name.
memory_size_type get_maximum_resource_usage(resource_type type) const
Get the maximum amount of the resource declared by this node.
Definition: node.h:124
The base class for indicating the progress of some task.
Null-object progress indicator.
STATE
Used internally to check order of method calls.
Definition: node.h:94
Predeclarations of some pipelining classes.
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
virtual void evacuate()
Overridden by nodes that have data to evacuate.
Definition: node.h:342
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
resource_type get_resource_being_assigned() const
Used internally to check order of method calls.
Definition: node.h:436
virtual ~node()
Virtual dtor.
Definition: node.h:110
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
Definition: node.h:251
Base class of all pipelining factories.
Definition: factory_base.h:73
virtual void go()
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: node.h:309
void set_state(STATE s)
Used internally to check order of method calls.
Definition: node.h:429
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:632
virtual void prepare()
Called before memory assignment but after depending phases have executed and ended.
Definition: node.h:269
memory_size_type get_available_of_resource(resource_type type) const
Get the amount of the specific resource assigned to this node.
Definition: node.h:138
void _internal_set_available_of_resource(resource_type type, memory_size_type available)
Used internally to assign the available resource to the node.
stream_size_type get_steps()
Used internally for progress indication.
Definition: node.h:397
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Definition: node.h:408
void forward_any(std::string key, any_noncopyable value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
See node::forward.
const std::string & get_phase_name()
Get this node's phase name.
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
Definition: node.h:233
void push(const item_type &item)
For push nodes, push the next item to this node.
tpie::memory_bucket_ref allocator(size_t i=0)
Return an allocator that counts memory usage within the node.
Definition: node.h:788
size_t buckets() const
Count the number of memory buckets.
Definition: node.h:774
void set_breadcrumb(const std::string &breadcrumb)
Used internally when a pair_factory has a name set.
Definition: node.h:389
Base class of all nodes.
Definition: node.h:78
void register_datastructure_usage(const std::string &name, double priority=1)
Registers a datastructure.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
priority_type get_name_priority()
Get the priority of this node's name.
Definition: node.h:349
memory_size_type get_maximum_memory() const
Get the maximum amount of memory declared by this node.
Definition: node.h:186
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Virtual superclass for pipelines and subpipelines.
Definition: pipeline.h:110
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:564
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Definition: node.h:217
Class storring a reference to a memory bucket.
Definition: memory.h:366
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Definition: node.h:415
T & get_datastructure(const std::string &name)
Returns a previously declared datastructure.
Definition: node.h:738
Pipeline tokens.
Progress indicator base.
void set_maximum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare maximum resource requirements.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_datastructure(const std::string &name, T datastructure)
Returns a previously declared datastructure.
Definition: node.h:722
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:207
void set_datastructure_memory_limits(const std::string &name, memory_size_type min, memory_size_type max=std::numeric_limits< memory_size_type >::max())
Assign memory to a registered datastructure.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Bucket used for memory counting.
Definition: memory.h:352
any_noncopyable & fetch_any(std::string key)
Fetch piece of auxiliary data as any_noncopyable (the internal representation).
priority_type get_phase_name_priority()
Get the priority of this node's pdane name.
Definition: node.h:369
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
virtual void propagate()
Propagate stream metadata.
Definition: node.h:283
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
PLOT
Options for how to plot this node.
Definition: node.h:85
virtual void end()
End pipeline processing phase.
Definition: node.h:329
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
Definition: node.h:779
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:225
void no_forward_through()
Dissablow forwards through this node.
void set_resource_fraction(resource_type type, double f)
Set the resource priority of this node.
double get_resource_fraction(resource_type type) const
Get the priority for the specific resource of this node.
Definition: node.h:131
virtual bool can_evacuate()
Overridden by nodes that have data to evacuate.
Definition: node.h:335
virtual void resource_available_changed(resource_type, memory_size_type)
Called by the resource manager to notify the node's available amount of resource has changed...
Definition: node.h:166
memory_size_type get_used_memory() const
Get the amount of memory currently used by this node.
Definition: node.h:240
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
flags< PLOT > get_plot_options() const
Get options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:451
double get_memory_fraction() const
Get the memory priority of this node.
Definition: node.h:193
virtual void refresh() override
Display the indicator.
item_type pull()
For pull nodes, pull the next item from this node.
node()
Default constructor, using a new node_token.
STATE get_state() const
Used internally to check order of method calls.
Definition: node.h:422
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
Definition: node.h:589
node_token::id_t get_id() const
Get the internal node ID of this node (mainly for debugging purposes).
Definition: node.h:259
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:157
const std::string & get_name()
Get this node's name.
memory_size_type get_datastructure_memory(const std::string &name)
Returns the memory assigned to a datastructure.
memory_size_type get_minimum_memory() const
Get the minimum amount of memory declared by this node.
Definition: node.h:178