TPIE

2362a60
std_glue.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_STD_GLUE_H__
21 #define __TPIE_PIPELINING_STD_GLUE_H__
22 
23 #include <vector>
24 
25 #include <tpie/pipelining/node.h>
26 #include <tpie/pipelining/pipe_base.h>
27 #include <tpie/pipelining/factory_helpers.h>
28 
29 namespace tpie {
30 
31 namespace pipelining {
32 
33 namespace bits {
34 
35 template <typename dest_t, typename T, typename A>
36 class input_vector_t : public node {
37 public:
38  typedef T item_type;
39 
40  input_vector_t(dest_t dest, const std::vector<T, A> & input) : dest(std::move(dest)), input(input) {
41  add_push_destination(this->dest);
42  }
43 
44  input_vector_t(dest_t dest, std::vector<T, A> && input) = delete;
45 
46  void propagate() override {
47  forward("items", static_cast<stream_size_type>(input.size()));
48  set_steps(input.size());
49  }
50 
51  void go() override {
52  for (auto & i: input) {
53  dest.push(i);
54  step();
55  }
56  }
57 private:
58  dest_t dest;
59  const std::vector<T, A> & input;
60 };
61 
62 template <typename T, typename A>
63 class pull_input_vector_t : public node {
64 public:
65  typedef T item_type;
66 
67  pull_input_vector_t(const std::vector<T, A> & input) : input(input) {}
68 
69  pull_input_vector_t(std::vector<T, A> && input) = delete;
70 
71  void propagate() override {
72  forward("items", static_cast<stream_size_type>(input.size()));
73  }
74 
75  void begin() override {idx=0;};
76  bool can_pull() const {return idx < input.size();}
77  const T & peek() const {return input[idx];}
78  const T & pull() {return input[idx++];}
79 
80 private:
81  size_t idx;
82  const std::vector<T, A> & input;
83 };
84 
85 
86 template <typename T, typename A>
87 class output_vector_t : public node {
88 public:
89  typedef T item_type;
90 
91  output_vector_t(std::vector<T, A> & output) : output(output) {}
92 
93  output_vector_t(std::vector<T, A> && output) = delete;
94 
95  void push(const T & item) {
96  output.push_back(item);
97  }
98 private:
99  std::vector<item_type, A> & output;
100 };
101 
102 
103 template <typename F>
104 class lambda_t {
105 public:
106  template <typename dest_t>
107  class type: public node {
108  public:
109  typedef typename F::argument_type item_type;
110 
111  type(dest_t dest, const F & f): f(f), dest(std::move(dest)) {
112  }
113 
114  void push(const item_type & item) {
115  dest.push(f(item));
116  }
117  private:
118  F f;
119  dest_t dest;
120  };
121 };
122 
123 template <typename F>
125 public:
126  template <typename dest_t>
127  class type: public node {
128  public:
129  typedef typename F::argument_type item_type;
130 
131  type(dest_t dest, const F & f): f(f), dest(std::move(dest)) {
132  }
133 
134  void push(const item_type & item) {
135  typename F::result_type t=f(item);
136  if (t.second) dest.push(t.first);
137  }
138  private:
139  F f;
140  dest_t dest;
141  };
142 };
143 
144 
145 } // namespace bits
146 
152 template<typename T, typename A>
153 inline pipe_begin<tfactory<bits::input_vector_t, Args<T, A>, const std::vector<T, A> &> > input_vector(const std::vector<T, A> & input) {
154  return {input};
155 }
156 template<typename T, typename A>
157 pipe_begin<tfactory<bits::input_vector_t, Args<T, A>, const std::vector<T, A> &> > input_vector(std::vector<T, A> && input) = delete;
158 
164 template<typename T, typename A>
165 inline pullpipe_begin<termfactory<bits::pull_input_vector_t<T, A>, const std::vector<T, A> &> > pull_input_vector(const std::vector<T, A> & input) {
166  return {input};
167 }
168 template<typename T, typename A>
169 pullpipe_begin<termfactory<bits::pull_input_vector_t<T, A>, const std::vector<T, A> &> > pull_input_vector(std::vector<T, A> && input) = delete;
170 
175 template <typename T, typename A>
176 inline pipe_end<termfactory<bits::output_vector_t<T, A>, std::vector<T, A> &> > output_vector(std::vector<T, A> & output) {
177  return termfactory<bits::output_vector_t<T, A>, std::vector<T, A> &>(output);
178 }
179 template <typename T, typename A>
180 pipe_end<termfactory<bits::output_vector_t<T, A>, std::vector<T, A> &> > output_vector(std::vector<T, A> && output) = delete;
181 
188 template <typename F>
190  return tempfactory<bits::lambda_t<F>, F>(f);
191 }
192 
202 template <typename F>
205 }
206 
207 
208 } // namespace pipelining
209 
210 } // namespace tpie
211 
212 #endif
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
Definition: file_stream.h:379
void propagate() override
Propagate stream metadata.
Definition: std_glue.h:46
void propagate() override
Propagate stream metadata.
Definition: std_glue.h:71
Base class of all nodes.
Definition: node.h:78
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: std_glue.h:51
Node factory for variadic argument templated generators.
pipe_middle< tempfactory< bits::exclude_lambda_t< F >, F > > exclude_lambda(const F &f)
Pipelining nodes that applies to given functor to items in the stream.
Definition: std_glue.h:203
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:564
void begin() override
Begin pipeline processing phase.
Definition: std_glue.h:75
Node factory for variadic argument terminators.
pullpipe_begin< termfactory< bits::pull_input_vector_t< T, A >, const std::vector< T, A > & > > pull_input_vector(const std::vector< T, A > &input)
Pipelining nodes that pushes the contents of the given vector to the next node in the pipeline...
Definition: std_glue.h:165
pipe_middle< tempfactory< bits::lambda_t< F >, F > > lambda(const F &f)
Pipelining nodes that applies to given functor to items in the stream.
Definition: std_glue.h:189
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:431
pipe_end< termfactory< bits::output_vector_t< T, A >, std::vector< T, A > & > > output_vector(std::vector< T, A > &output)
Pipelining node that pushes items to the given vector.
Definition: std_glue.h:176
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
pipe_begin< tfactory< bits::input_vector_t, Args< T, A >, const std::vector< T, A > & > > input_vector(const std::vector< T, A > &input)
Pipelining nodes that pushes the contents of the given vector to the next node in the pipeline...
Definition: std_glue.h:153