TPIE

2362a60
stream.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_STREAM_H
21 #define TPIE_STREAM_H
22 
23 #include <tpie/stream_old.h>
24 #include <tpie/compressed/stream.h>
25 
26 namespace tpie {
27 
28 namespace ami {
29 
30 template <typename T>
31 class stream {
32 public:
33  typedef T item_type;
34 
35  stream(compression_flags compressionFlags=compression_none)
36  : m_status(STREAM_STATUS_INVALID)
37  {
38  try {
39  m_stream.open(m_temp, access_read_write,
40  0, access_sequential, compressionFlags);
41  } catch (const stream_exception & e) {
42  log_fatal() << "Open failed: " << e.what() << std::endl;
43  return;
44  }
45  m_status = STREAM_STATUS_VALID;
46  }
47 
48  stream(const std::string & fileName,
49  stream_type st=READ_WRITE_STREAM,
50  compression_flags compressionFlags=compression_none)
51  : m_temp(fileName, true)
52  , m_status(STREAM_STATUS_INVALID)
53  {
54  try {
55  m_stream.open(m_temp, st == READ_STREAM ? access_read : access_read_write,
56  0, access_sequential, compressionFlags);
57  if (st == APPEND_STREAM) m_stream.seek(0, file_stream_base::end);
58  } catch (const stream_exception & e) {
59  log_fatal() << "Open failed: " << e.what() << std::endl;
60  return;
61  }
62  m_status = STREAM_STATUS_VALID;
63  }
64 
65  err new_substream(stream_type /*st*/,
66  stream_offset_type /*sub_begin*/,
67  stream_offset_type /*sub_end*/,
68  stream<T> ** /*sub_stream*/)
69  {
70  return BTE_ERROR;
71  }
72 
73  stream_status status() const {
74  return m_status;
75  }
76 
77  bool is_valid() const {
78  return m_status == STREAM_STATUS_VALID;
79  }
80 
81  bool operator!() const {
82  return !is_valid();
83  }
84 
85  err read_item(T **elt) {
86  if (!m_stream.can_read())
87  return END_OF_STREAM;
88 
89  *elt = &(const_cast<T &>(m_stream.read()));
90  return NO_ERROR;
91  }
92 
93  err write_item(const T &elt) {
94  try {
95  m_stream.write(elt);
96  } catch (const stream_exception & e) {
97  log_warning() << "write_item failed: " << e.what() << std::endl;
98  return BTE_ERROR;
99  }
100  return NO_ERROR;
101  }
102 
103  err read_array(T *mm_space, stream_offset_type *len) {
104  size_type l=(size_t)*len;
105  err e = read_array(mm_space, l);
106  *len = l;
107  return e;
108  }
109 
110  err read_array(T *mm_space, memory_size_type & len) {
111  size_type l = static_cast<size_type>(std::min(
112  static_cast<stream_size_type>(len),
113  static_cast<stream_size_type>(m_stream.size() - m_stream.offset())));
114  m_stream.read(mm_space, mm_space+l);
115  return (l == len)?NO_ERROR:END_OF_STREAM;
116  }
117 
118  err write_array(const T *mm_space, memory_size_type len) {
119  try {
120  m_stream.write(mm_space, mm_space+len);
121  } catch (const stream_exception & e) {
122  log_warning() << "write_item failed: " << e.what() << std::endl;
123  return BTE_ERROR;
124  }
125  return NO_ERROR;
126  }
127 
128  stream_offset_type stream_len(void) const {
129  return m_stream.size();
130  }
131 
132  std::string name() const {
133  return m_stream.path();
134  }
135 
136  err seek(stream_offset_type offset) {
137  try {
138  m_stream.seek(offset);
139  } catch(const stream_exception &e) {
140  TP_LOG_WARNING_ID("BTE error - seek failed: " << e.what());
141  tpie::unused(e);
142  return BTE_ERROR;
143  }
144  return NO_ERROR;
145  }
146 
147  stream_offset_type tell() const {
148  return m_stream.offset();
149  }
150 
151  err truncate(stream_offset_type offset) {
152  try {
153  m_stream.truncate(offset);
154  } catch(const stream_exception & e) {
155  TP_LOG_WARNING_ID("BTE error - truncate failed: " << e.what());
156  tpie::unused(e);
157  return BTE_ERROR;
158  }
159  return NO_ERROR;
160  }
161 
162  err main_memory_usage(size_type *usage,
163  stream_usage usage_type) const
164  {
165  switch (usage_type) {
167  *usage = sizeof(*this) + file_stream<T>::memory_usage(0.0);
168  return NO_ERROR;
172  *usage = memory_usage(1);
173  return NO_ERROR;
174  case STREAM_USAGE_BUFFER:
175  *usage = file_stream<T>::memory_usage(block_factor())
176  - file_stream<T>::memory_usage(0.0);
177  return NO_ERROR;
178  }
179  return BTE_ERROR;
180  }
181 
182  static memory_size_type memory_usage(memory_size_type count) {
183  return count*(file_stream<T>::memory_usage() + sizeof(stream<T>));
184  }
185 
186  size_t available_streams(void) {
187  return get_file_manager().available();
188  }
189 
190  memory_size_type chunk_size(void) const {
191  return get_block_size() / sizeof(T);
192  }
193 
194  void persist(persistence p) {
195  m_temp.set_persistent(p == PERSIST_PERSISTENT);
196  }
197 
198  persistence persist() const {
199  return m_temp.is_persistent() ? PERSIST_PERSISTENT : PERSIST_DELETE;
200  }
201 
202  std::string& sprint() {
203  static std::string buf;
204  std::stringstream ss;
205  ss << "STREAM " << name() << " " << static_cast<long>(stream_len());
206  ss >> buf;
207  return buf;
208  }
209 
210  file_stream<T>& underlying_stream() {
211  return m_stream;
212  }
213 
214 private:
215  stream(const stream<T>& other);
216  stream<T>& operator=(const stream<T>& other);
217 
218  temp_file m_temp;
219  file_stream<T> m_stream;
220 
221  stream_status m_status;
222 
223  static inline float block_factor() {
224 #ifndef STREAM_UFS_BLOCK_FACTOR
225  return 1.0;
226 #else
227 # ifdef WIN32
228  return static_cast<float>(STREAM_UFS_BLOCK_FACTOR)/32;
229 # else
230  return static_cast<float>(STREAM_UFS_BLOCK_FACTOR)/512;
231 # endif
232 #endif
233  }
234 };
235 
236 } // namespace ami
237 
238 } // namespace tpie
239 
240 #endif // TPIE_STREAM_H
Sequential access is intended.
Definition: cache_hint.h:36
Amount currently in use.
Definition: stream_usage.h:34
memory_size_type get_block_size()
Get the TPIE block size.
Maximum additional amount used by each substream created.
Definition: stream_usage.h:38
PERSIST_DELETE
Delete the stream from the disk when it is destructed.
Definition: persist.h:39
Max amount that will ever be used.
Definition: stream_usage.h:36
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
No written blocks should be compressed.
Definition: scheme.h:37
No error occurred.
Definition: err.h:47
Open a file for reading.
Definition: access_type.h:31
Overhead of the object without the buffer.
Definition: stream_usage.h:30
Compressed stream public API.
void set_persistent(bool p)
Set persistence.
Definition: tempname.h:230
stream_status
AMI stream status.
Definition: stream_old.h:61
file_manager & get_file_manager()
Return a reference to the file manager.
Class representing a reference to a temporary file.
Definition: tempname.h:202
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
stream_type
AMI stream types passed to constructors.
Definition: stream_old.h:53
logstream & log_fatal()
Return logstream for writing fatal log messages.
Definition: tpie_log.h:142
AMI streams.
PERSIST_PERSISTENT
Do not delete the stream from the disk when it is destructed.
Definition: persist.h:41
bool is_persistent() const
Definition: tempname.h:222
Compressed stream.
Definition: predeclare.h:46
compression_flags
Possible values for the compressionFlags parameter to stream::open.
Definition: scheme.h:33
err
Legacy TPIE error codes.
Definition: err.h:45
An error occurred at the BTE level.
Definition: err.h:63
stream_usage
Definition: stream_usage.h:28
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:157
An attempt was made to read past the end of a stream or write past the end of a substream.
Definition: err.h:52
Max amount ever used by a buffer.
Definition: stream_usage.h:32
Open a file for reading or writing.
Definition: access_type.h:35