TPIE

2362a60
helpers.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_HELPERS_H__
21 #define __TPIE_PIPELINING_HELPERS_H__
22 
23 #include <iostream>
24 #include <tpie/pipelining/node.h>
25 #include <tpie/pipelining/pipe_base.h>
26 #include <tpie/pipelining/factory_helpers.h>
27 #include <tpie/memory.h>
28 #include <tpie/tpie_assert.h>
29 
30 namespace tpie {
31 
32 namespace pipelining {
33 
34 namespace bits {
35 
36 template <typename dest_t>
37 class ostream_logger_t : public node {
38 public:
39  typedef typename push_type<dest_t>::type item_type;
40 
41  inline ostream_logger_t(dest_t dest, std::ostream & log) : dest(std::move(dest)), log(log), begun(false), ended(false) {
42  add_push_destination(this->dest);
43  set_name("Log", PRIORITY_INSIGNIFICANT);
44  }
45  virtual void begin() override {
46  node::begin();
47  begun = true;
48  }
49  virtual void end() override {
50  node::end();
51  ended = true;
52  }
53  inline void push(const item_type & item) {
54  if (!begun) {
55  log << "WARNING: push() called before begin(). Calling begin on rest of pipeline." << std::endl;
56  begin();
57  }
58  if (ended) {
59  log << "WARNING: push() called after end()." << std::endl;
60  ended = false;
61  }
62  log << "pushing " << item << std::endl;
63  dest.push(item);
64  }
65 private:
66  dest_t dest;
67  std::ostream & log;
68  bool begun;
69  bool ended;
70 };
71 
72 template <typename source_t>
73 class pull_peek_t : public node {
74 public:
75  typedef typename pull_type<source_t>::type item_type;
76 
77  pull_peek_t(source_t source) : source(std::move(source)) {
78  add_pull_source(this->source);
79  set_plot_options(PLOT_SIMPLIFIED_HIDE);
80  }
81 
82  virtual void begin() override {
83  could_pull = source.can_pull();
84  if (could_pull) item=source.pull();
85  }
86 
87  item_type pull() {
88  item_type i = item;
89  could_pull = source.can_pull();
90  if (could_pull) item=source.pull();
91  return i;
92  }
93 
94  const item_type & peek() const {
95  return item;
96  }
97 
98  bool can_pull() const {
99  return could_pull;
100  }
101 
102 private:
103  item_type item;
104  bool could_pull;
105  source_t source;
106 };
107 
108 template <typename T>
109 class dummydest_t : public node {
110 public:
111  dummydest_t() : buffer(std::make_shared<T>()) {}
112 
113  typedef T item_type;
114  std::shared_ptr<T> buffer;
115  inline void push(const T & el) {
116  *buffer = el;
117  }
118  inline T pull() {
119  return *buffer;
120  }
121 };
122 
123 template <typename fact2_t>
124 class fork_t {
125 public:
126  typedef typename fact2_t::constructed_type dest2_t;
127 
128  template <typename dest_t>
129  class type : public node {
130  public:
131  typedef typename push_type<dest_t>::type item_type;
132 
133  type(dest_t dest, fact2_t fact2) : dest(std::move(dest)), dest2(fact2.construct()) {
134  add_push_destination(this->dest);
135  add_push_destination(dest2);
136  }
137 
138  inline void push(const item_type & item) {
139  dest.push(item);
140  dest2.push(item);
141  }
142 
143  private:
144  dest_t dest;
145  dest2_t dest2;
146  };
147 };
148 
149 
150 template <typename source_t, typename dest_fact_t>
151 class pull_fork_t: public node {
152 public:
153  typedef typename pull_type<source_t>::type item_type;
154 
155  pull_fork_t(source_t source, dest_fact_t dest_fact)
156  : dest(dest_fact.construct())
157  , source(std::move(source)) {
158  add_pull_source(this->source);
159  add_push_destination(dest);
160  }
161 
162  bool can_pull() {return source.can_pull();}
163 
164  item_type pull() {
165  item_type i=source.pull();
166  dest.push(i);
167  return i;
168  }
169 
170 private:
171  typename dest_fact_t::constructed_type dest;
172  source_t source;
173 };
174 
175 
176 template <typename T>
177 class null_sink_t: public node {
178 public:
179  typedef T item_type;
180 
181  void push(const T &) {}
182 };
183 
184 
185 template <typename T>
186 class zero_source_t: public node {
187 public:
188  typedef T item_type;
189 
190  T pull() {tpie_unreachable();}
191  bool can_pull() {return false;}
192 };
193 
194 
195 template <typename IT>
197  IT i;
198  IT till;
199 public:
200  typedef typename IT::value_type item_type;
201  pull_input_iterator_t(IT from, IT to)
202  : i(from)
203  , till(to)
204  {
205  }
206 
207  bool can_pull() {
208  return i != till;
209  }
210 
211  item_type pull() {
212  return *i++;
213  }
214 };
215 
216 template <typename IT>
218 public:
219  template <typename dest_t>
220  class type : public node {
221  IT i;
222  IT till;
223  dest_t dest;
224  public:
225  type(dest_t dest, IT from, IT to)
226  : i(from)
227  , till(to)
228  , dest(std::move(dest))
229  {
230  add_push_destination(dest);
231  }
232 
233  virtual void go() override {
234  while (i != till) {
235  dest.push(*i);
236  ++i;
237  }
238  }
239  };
240 };
241 
242 template <typename Iterator, typename Item = void>
244 
245 template <typename Iterator>
246 class push_output_iterator_t<Iterator, void> : public node {
247  Iterator i;
248 public:
249  typedef typename Iterator::value_type item_type;
250  push_output_iterator_t(Iterator to)
251  : i(to)
252  {
253  }
254 
255  void push(const item_type & item) {
256  *i = item;
257  ++i;
258  }
259 };
260 
261 template <typename Iterator, typename Item>
262 class push_output_iterator_t : public node {
263  Iterator i;
264 public:
265  typedef Item item_type;
266  push_output_iterator_t(Iterator to)
267  : i(to)
268  {
269  }
270 
271  void push(const item_type & item) {
272  *i = item;
273  ++i;
274  }
275 };
276 
277 template <typename IT>
279 public:
280  template <typename dest_t>
281  class type : public node {
282  IT i;
283  dest_t dest;
284  public:
285  type(dest_t dest, IT to)
286  : i(to)
287  , dest(std::move(dest))
288  {
289  add_pull_source(dest);
290  }
291 
292  virtual void go() override {
293  while (dest.can_pull()) {
294  *i = dest.pull();
295  ++i;
296  }
297  }
298  };
299 };
300 
301 template <typename F>
302 class preparer_t {
303 public:
304  template <typename dest_t>
305  class type: public node {
306  private:
307  F functor;
308  dest_t dest;
309  public:
310  typedef typename push_type<dest_t>::type item_type;
311  type(dest_t dest, const F & functor): functor(functor), dest(std::move(dest)) {
312  add_push_destination(this->dest);
313  }
314 
315  void prepare() override {
316  functor(*static_cast<node*>(this));
317  };
318 
319  void push(const item_type & item) {dest.push(item);}
320  };
321 };
322 
323 
324 template <typename F>
326 public:
327  template <typename dest_t>
328  class type: public node {
329  private:
330  F functor;
331  dest_t dest;
332  public:
333  typedef typename push_type<dest_t>::type item_type;
334  type(dest_t dest, const F & functor): functor(functor), dest(std::move(dest)) {
335  add_push_destination(this->dest);
336  }
337 
338  void propagate() override {
339  functor(*static_cast<node*>(this));
340  };
341 
342  void push(const item_type & item) {dest.push(item);}
343  };
344 };
345 
346 template <typename src_fact_t>
347 struct zip_t {
348  typedef typename src_fact_t::constructed_type src_t;
349  template <typename dest_t>
350  class type: public node {
351  public:
352  typedef typename push_type<dest_t>::type::first_type item_type;
353 
354  type(dest_t dest, src_fact_t src_fact)
355  : src(src_fact.construct()), dest(std::move(dest)) {
356  add_push_destination(this->dest);
357  add_pull_source(src);
358  }
359 
360  void push(const item_type & item) {
361  tp_assert(src.can_pull(), "We should be able to pull");
362  dest.push(std::make_pair(item, src.pull()));
363  }
364  private:
365  src_t src;
366  dest_t dest;
367  };
368 };
369 
370 template <typename fact2_t>
371 struct unzip_t {
372  typedef typename fact2_t::constructed_type dest2_t;
373 
374  template <typename dest1_t>
375  class type: public node {
376  public:
377  typedef typename push_type<dest1_t>::type first_type;
378  typedef typename push_type<dest2_t>::type second_type;
379  typedef std::pair<first_type, second_type> item_type;
380 
381  type(dest1_t dest1, fact2_t fact2) : dest1(std::move(dest1)), dest2(fact2.construct()) {
382  add_push_destination(this->dest1);
383  add_push_destination(dest2);
384  }
385 
386  void push(const item_type & item) {
387  dest2.push(item.second);
388  dest1.push(item.first);
389  }
390  private:
391  dest1_t dest1;
392  dest2_t dest2;
393  };
394 };
395 
396 template <typename T>
397 class item_type_t {
398 public:
399  template <typename dest_t>
400  class type: public node {
401  private:
402  dest_t dest;
403  public:
404  typedef T item_type;
405  type(dest_t dest): dest(std::move(dest)) {}
406  void push(const item_type & item) {dest.push(item);}
407  };
408 };
409 
410 template <typename fact_t>
412 public:
413  template <typename dest_t>
414  class type: public node {
415  public:
416  typedef typename fact_t::constructed_type source_t;
417  typedef typename push_type<dest_t>::type item_type;
418 
419  type(dest_t dest, fact_t fact)
420  : dest(std::move(dest)), src(fact.construct()) {
421  add_push_destination(dest);
422  add_pull_source(src);
423  }
424 
425  void propagate() override {
426  size_t size = fetch<stream_size_type>("items");
427  set_steps(size);
428  }
429 
430  void go() override {
431  while (src.can_pull()) {
432  dest.push(src.pull());
433  step();
434  }
435  }
436 
437  private:
438  dest_t dest;
439  source_t src;
440  };
441 };
442 
443 template <typename dest_t, typename equal_t>
444 class unique_t : public node {
445 public:
446  typedef typename push_type<dest_t>::type item_type;
447 
448  unique_t(dest_t dest, equal_t equal)
449  : equal(equal), dest(std::move(dest)) {}
450 
451  void begin() override {
452  first = true;
453  }
454 
455  void push(const item_type & item) {
456  if (!first && equal(prev, item))
457  return;
458  first = false;
459  dest.push(item);
460  prev = item;
461  }
462 
463 private:
464  bool first;
465  equal_t equal;
466  item_type prev;
467  dest_t dest;
468 };
469 
470 } // namespace bits
471 
476 inline pipe_middle<factory<bits::ostream_logger_t, std::ostream &> >
478  return {std::cout};
479 }
480 
485 
486 
493 template <typename fact_t>
496  return {std::move(to.factory)};
497 }
498 
505 template <typename dest_fact_t>
506 pullpipe_middle<tfactory<bits::pull_fork_t, Args<dest_fact_t>, dest_fact_t> >
507 pull_fork(dest_fact_t dest_fact) {
508  return {std::move(dest_fact)};
509 }
510 
517 template <typename fact_t>
518 pipe_middle<tempfactory<bits::unzip_t<fact_t>, fact_t> >
520  return {std::move(to.factory)};
521 }
522 
530 template <typename fact_t>
531 pipe_middle<tempfactory<bits::zip_t<fact_t>, fact_t> >
533  return {std::move(from.factory)};
534 }
535 
541 template <typename T>
542 inline pipe_end<termfactory<bits::null_sink_t<T> > >
544 
550 template <typename T>
551 inline pullpipe_begin<termfactory<bits::zero_source_t<T> > >
553 
554 
555 template <template <typename dest_t> class Fact, typename... T>
556 pipe_begin<factory<Fact, T...> > make_pipe_begin(T... t) {
557  return {t...};
558 }
559 
560 template <template <typename dest_t> class Fact, typename... T>
561 pipe_middle<factory<Fact, T...> > make_pipe_middle(T... t) {
562  return {t...};
563 }
564 
565 template <typename Fact, typename... T>
566 pipe_end<termfactory<Fact, T...> > make_pipe_end(T ... t) {
567  return {t...};
568 }
569 
575 template <typename IT>
577  return termfactory<bits::pull_input_iterator_t<IT>, IT, IT>(begin, end);
578 }
579 
586 template <typename IT>
588  return tempfactory<bits::push_input_iterator_t<IT>, IT, IT>(begin, end);
589 }
590 
597 template <typename IT>
600 }
601 
609 template <typename Item, typename IT>
612 }
613 
620 template <typename IT>
623 }
624 
631 template <typename F>
633  return tempfactory<bits::preparer_t<F>, F>(functor);
634 }
635 
642 template <typename F>
644  return tempfactory<bits::propagater_t<F>, F>(functor);
645 }
646 
653 template <typename T>
656 }
657 
663 template <typename fact_t>
664 pipe_begin<tempfactory<bits::pull_source_t<fact_t>, fact_t> >
666  return {std::move(from.factory)};
667 }
668 
675 template <typename equal_t>
677  return {equal};
678 }
679 
680 }
681 
682 }
683 
684 #endif
Defines the tp_assert macro.
pipe_end< termfactory< bits::push_output_iterator_t< IT, Item >, IT > > typed_push_output_iterator(IT to)
A node that writes its given items to the destination pointed to by the given iterator.
Definition: helpers.h:610
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:299
Memory management subsystem.
pipe_middle< tempfactory< bits::preparer_t< F >, F > > preparer(const F &functor)
Create preparer callback identity pipe node.
Definition: helpers.h:632
pipe_end< termfactory< bits::push_output_iterator_t< IT >, IT > > push_output_iterator(IT to)
A node that writes its given items to the destination pointed to by the given iterator.
Definition: helpers.h:598
pullpipe_end< tempfactory< bits::pull_output_iterator_t< IT >, IT > > pull_output_iterator(IT to)
A pull-pipe node that writes its given items to the destination pointed to by the given iterator...
Definition: helpers.h:621
pipe_middle< tfactory< bits::unique_t, Args< equal_t >, equal_t > > unique(equal_t equal)
Filter consecutive duplicates out.
Definition: helpers.h:676
void propagate() override
Propagate stream metadata.
Definition: helpers.h:425
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
pipe_middle< tempfactory< bits::propagater_t< F >, F > > propagater(const F &functor)
Create propagate callback identity pipe node.
Definition: helpers.h:643
virtual void begin() override
Begin pipeline processing phase.
Definition: helpers.h:82
pullpipe_middle< factory< bits::pull_peek_t > > pull_peek
A node that allows peeking at the next item in the pipeline.
Definition: helpers.h:484
Base class of all nodes.
Definition: node.h:78
pipe_middle< tempfactory< bits::zip_t< fact_t >, fact_t > > zip(pullpipe_begin< fact_t > from)
Create a zip pipe node.
Definition: helpers.h:532
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:152
void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: helpers.h:315
Node factory for variadic argument templated generators.
virtual void end() override
End pipeline processing phase.
Definition: helpers.h:49
void propagate() override
Propagate stream metadata.
Definition: helpers.h:338
virtual void begin() override
Begin pipeline processing phase.
Definition: helpers.h:45
void begin() override
Begin pipeline processing phase.
Definition: helpers.h:451
pipe_end< termfactory< bits::null_sink_t< T > > > null_sink()
Create a dummy end pipe node.
Definition: helpers.h:543
pipe_middle< tempfactory< bits::unzip_t< fact_t >, fact_t > > unzip(pipe_end< fact_t > to)
Create unzip pipe node.
Definition: helpers.h:519
pullpipe_middle< tfactory< bits::pull_fork_t, Args< dest_fact_t >, dest_fact_t > > pull_fork(dest_fact_t dest_fact)
Create a pulling fork pipe node.
Definition: helpers.h:507
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Node factory for variadic argument terminators.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
pipe_begin< tempfactory< bits::push_input_iterator_t< IT >, IT, IT > > push_input_iterator(IT begin, IT end)
A pipelining node that pushes the items in the range given by two iterators.
Definition: helpers.h:587
virtual void end()
End pipeline processing phase.
Definition: node.h:329
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: helpers.h:430
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: helpers.h:292
pipe_middle< tempfactory< bits::fork_t< fact_t >, fact_t > > fork(pipe_end< fact_t > to)
Create a fork pipe node.
Definition: helpers.h:495
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
pullpipe_begin< termfactory< bits::zero_source_t< T > > > zero_source()
Create a dummy pull begin pipe node.
Definition: helpers.h:552
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
pipe_begin< tempfactory< bits::pull_source_t< fact_t >, fact_t > > pull_source(pullpipe_begin< fact_t > from)
A node that pulls items from source and push them into dest.
Definition: helpers.h:665
pipe_middle< factory< bits::ostream_logger_t, std::ostream & > > cout_logger()
A pipelining node that writes items to standard out and then pushes them to the next node...
Definition: helpers.h:477
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: helpers.h:233
pullpipe_begin< termfactory< bits::pull_input_iterator_t< IT >, IT, IT > > pull_input_iterator(IT begin, IT end)
A pull-pipe that returns items in the range given by two iterators.
Definition: helpers.h:576