20 #ifndef __TPIE_PIPELINING_NODE_H__
21 #define __TPIE_PIPELINING_NODE_H__
23 #include <tpie/pipelining/exception.h>
27 #include <tpie/pipelining/priority_type.h>
29 #include <tpie/pipelining/node_name.h>
30 #include <tpie/pipelining/node_traits.h>
31 #include <tpie/flags.h>
37 namespace pipelining {
47 virtual void refresh()
override;
53 memory_size_type minimum = 0;
54 memory_size_type maximum = std::numeric_limits<memory_size_type>::max();
55 double fraction = 0.0;
57 memory_size_type available = 0;
64 priority_type namePriority = PRIORITY_NO_NAME;
66 std::string phaseName;
67 priority_type phaseNamePriority = PRIORITY_NO_NAME;
69 stream_size_type stepsTotal = 0;
80 typedef boost::optional<any_noncopyable &> maybeany_t;
86 PLOT_SIMPLIFIED_HIDE=1,
99 STATE_AFTER_PROPAGATE,
117 return m_parameters.resource_parameters[type].minimum;
125 return m_parameters.resource_parameters[type].maximum;
132 return m_parameters.resource_parameters[type].fraction;
139 return m_parameters.resource_parameters[type].available;
241 memory_size_type ans=0;
242 for (
const auto & p: m_buckets)
243 if (p) ans += p->count;
252 return token.get_map();
302 virtual bool is_go_free()
const {
return false;}
310 log_warning() <<
"node subclass " <<
typeid(*this).name()
311 <<
" is not an initiator node" << std::endl;
350 return m_parameters.namePriority;
363 void set_name(
const std::string & name, priority_type priority = PRIORITY_USER);
370 return m_parameters.phaseNamePriority;
383 void set_phase_name(
const std::string & name, priority_type priority = PRIORITY_USER);
390 m_parameters.name = m_parameters.name.empty() ? breadcrumb : (breadcrumb +
" | " + m_parameters.name);
398 return m_parameters.stepsTotal;
401 stream_size_type get_steps_left() {
437 return m_resourceBeingAssigned;
444 m_resourceBeingAssigned = type;
452 return m_plotOptions;
460 m_plotOptions = options;
468 #pragma warning( push )
469 #pragma warning( disable : 4355 )
480 node & operator=(
const node & other) =
delete;
494 #pragma warning( pop )
563 template <
typename T>
564 void forward(std::string key, T value, memory_size_type k = std::numeric_limits<memory_size_type>::max()) {
577 void add_forwarded_data(std::string key, node_token::id_t from_node);
582 maybeany_t get_forwarded_data_maybe(std::string key);
609 template <
typename T>
613 return any_cast<T>(item);
615 std::stringstream ss;
616 ss <<
"Trying to fetch key '" << key <<
"' of type "
617 <<
typeid(T).name() <<
" but forwarded data was of type "
618 << item.type().name() <<
". Message was: " << m.what();
648 void step_overflow();
654 void step(stream_size_type steps = 1) {
660 if (m_stepsLeft < steps)
663 m_stepsLeft -= steps;
708 void set_datastructure_memory_limits(
const std::string & name, memory_size_type min, memory_size_type max=std::numeric_limits<memory_size_type>::max());
723 bits::node_map::datastructuremap_t & structures =
get_node_map()->find_authority()->get_datastructures();
724 bits::node_map::datastructuremap_t::iterator i = structures.find(name);
726 if(i == structures.end())
727 throw tpie::exception(
"attempted to set non-registered datastructure");
729 i->second.second = move_if_movable<T>(datastructure);
739 bits::node_map::datastructuremap_t & structures =
get_node_map()->find_authority()->get_datastructures();
740 bits::node_map::datastructuremap_t::iterator i = structures.find(name);
742 if(i == structures.end())
743 throw tpie::exception(
"attempted to get non-registered datastructure");
745 return any_cast<T>(i->second.second);
748 void unset_datastructure(
const std::string & name) {
749 bits::node_map::datastructuremap_t & structures =
get_node_map()->find_authority()->get_datastructures();
750 bits::node_map::datastructuremap_t::iterator i = structures.find(name);
752 if(i == structures.end())
return;
753 i->second.second.reset();
757 struct datastructure_info_t {
758 datastructure_info_t() : min(0), max(std::numeric_limits<memory_size_type>::max()) {}
759 memory_size_type min;
760 memory_size_type max;
764 typedef std::map<std::string, datastructure_info_t> datastructuremap_t;
766 const datastructuremap_t & get_datastructures()
const {
767 return m_datastructures;
774 size_t buckets()
const {
return m_buckets.size();}
779 std::unique_ptr<memory_bucket> &
bucket(
size_t i) {
780 if (m_buckets.size() <= i) m_buckets.resize(i+1);
792 friend class bits::memory_runtime;
794 friend class bits::datastructure_runtime;
803 std::vector<std::unique_ptr<memory_bucket> > m_buckets;
805 std::map<std::string, any_noncopyable> m_forwardedFromHere;
806 std::map<std::string, node_token::id_t> m_forwardedToHere;
808 datastructuremap_t m_datastructures;
809 stream_size_type m_stepsLeft;
812 resource_type m_resourceBeingAssigned = NO_RESOURCE;
813 std::unique_ptr<progress_indicator_base> m_piProxy;
814 flags<PLOT> m_plotOptions;
827 #endif // __TPIE_PIPELINING_NODE_H__
T & fetch(std::string key)
Fetch piece of auxiliary data, expecting a given value type.
memory_size_type get_available_memory() const
Get the amount of memory assigned to this node.
bool can_pull() const
For pull nodes, return true if there are more items to be pulled.
void set_resource_being_assigned(resource_type type)
Used internally to check order of method calls.
maybeany_t fetch_maybe(std::string key)
Fetch piece of auxiliary data as any_noncopyable (the internal representation) wrapped in a boost::op...
memory_size_type get_minimum_resource_usage(resource_type type) const
Get the minimum amount of the resource declared by this node.
virtual void begin()
Begin pipeline processing phase.
Defines all types of managed resources.
void set_phase_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's phase name.
memory_size_type get_maximum_resource_usage(resource_type type) const
Get the maximum amount of the resource declared by this node.
The base class for indicating the progress of some task.
Null-object progress indicator.
STATE
Used internally to check order of method calls.
Predeclarations of some pipelining classes.
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
virtual void evacuate()
Overridden by nodes that have data to evacuate.
void unused(const T &x)
Declare that a variable is unused on purpose.
resource_type get_resource_being_assigned() const
Used internally to check order of method calls.
virtual ~node()
Virtual dtor.
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one...
Base class of all pipelining factories.
virtual void go()
For initiator nodes, execute this phase by pushing all items to be pushed.
void set_state(STATE s)
Used internally to check order of method calls.
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
virtual void prepare()
Called before memory assignment but after depending phases have executed and ended.
memory_size_type get_available_of_resource(resource_type type) const
Get the amount of the specific resource assigned to this node.
void _internal_set_available_of_resource(resource_type type, memory_size_type available)
Used internally to assign the available resource to the node.
stream_size_type get_steps()
Used internally for progress indication.
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
void forward_any(std::string key, any_noncopyable value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
See node::forward.
const std::string & get_phase_name()
Get this node's phase name.
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
void push(const item_type &item)
For push nodes, push the next item to this node.
tpie::memory_bucket_ref allocator(size_t i=0)
Return an allocator that counts memory usage within the node.
size_t buckets() const
Count the number of memory buckets.
void set_breadcrumb(const std::string &breadcrumb)
Used internally when a pair_factory has a name set.
void register_datastructure_usage(const std::string &name, double priority=1)
Registers a datastructure.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
priority_type get_name_priority()
Get the priority of this node's name.
memory_size_type get_maximum_memory() const
Get the maximum amount of memory declared by this node.
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Virtual superclass for pipelines and subpipelines.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Class storring a reference to a memory bucket.
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
T & get_datastructure(const std::string &name)
Returns a previously declared datastructure.
void set_maximum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare maximum resource requirements.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_datastructure(const std::string &name, T datastructure)
Returns a previously declared datastructure.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
void set_datastructure_memory_limits(const std::string &name, memory_size_type min, memory_size_type max=std::numeric_limits< memory_size_type >::max())
Assign memory to a registered datastructure.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Bucket used for memory counting.
any_noncopyable & fetch_any(std::string key)
Fetch piece of auxiliary data as any_noncopyable (the internal representation).
priority_type get_phase_name_priority()
Get the priority of this node's pdane name.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
virtual void propagate()
Propagate stream metadata.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
PLOT
Options for how to plot this node.
virtual void end()
End pipeline processing phase.
void step(stream_size_type steps=1)
Step the progress indicator.
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
void set_memory_fraction(double f)
Set the memory priority of this node.
void no_forward_through()
Dissablow forwards through this node.
void set_resource_fraction(resource_type type, double f)
Set the resource priority of this node.
double get_resource_fraction(resource_type type) const
Get the priority for the specific resource of this node.
virtual bool can_evacuate()
Overridden by nodes that have data to evacuate.
virtual void resource_available_changed(resource_type, memory_size_type)
Called by the resource manager to notify the node's available amount of resource has changed...
memory_size_type get_used_memory() const
Get the amount of memory currently used by this node.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
flags< PLOT > get_plot_options() const
Get options specified for plot(), as a combination of node::PLOT values.
double get_memory_fraction() const
Get the memory priority of this node.
virtual void refresh() override
Display the indicator.
item_type pull()
For pull nodes, pull the next item from this node.
node()
Default constructor, using a new node_token.
STATE get_state() const
Used internally to check order of method calls.
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
node_token::id_t get_id() const
Get the internal node ID of this node (mainly for debugging purposes).
logstream & log_warning()
Return logstream for writing warning log messages.
const std::string & get_name()
Get this node's name.
memory_size_type get_datastructure_memory(const std::string &name)
Returns the memory assigned to a datastructure.
memory_size_type get_minimum_memory() const
Get the minimum amount of memory declared by this node.