TPIE

2362a60
tpie::file_stream< T > Class Template Reference

Compressed stream. More...

#include <tpie/compressed/stream.h>

Inherits tpie::compressed_stream_base.

Public Types

typedef T item_type
 
typedef
file_stream_base::offset_type 
offset_type
 
typedef std::shared_ptr
< compressor_buffer
buffer_t
 

Public Member Functions

 file_stream (double blockFactor=1.0)
 
void describe (std::ostream &out)
 For debugging: Describe the internal stream state in a string. More...
 
std::string describe ()
 For debugging: Describe the internal stream state in a string. More...
 
virtual void post_open () override
 
void seek (stream_offset_type offset, offset_type whence=beginning)
 Precondition: is_open() Precondition: offset == 0. More...
 
void truncate (stream_size_type offset)
 Truncate to given size. More...
 
void truncate (const stream_position &pos)
 Truncate to given stream position. More...
 
stream_position get_position ()
 Store the current stream position such that it may be found later on. More...
 
void set_position (const stream_position &pos)
 Seek to a position that was previously recalled with get_position. More...
 
const T & read ()
 Reads next item from stream if can_read() == true. More...
 
const T & peek ()
 Peeks next item from stream if can_read() == true. More...
 
void skip ()
 
void skip_back ()
 
template<typename IT >
void read (IT const a, IT const b)
 Precondition: is_open(). More...
 
bool can_read ()
 Check if the next call to read() will succeed or not. More...
 
bool can_read_back ()
 Check if the next call to read_back() will succeed or not. More...
 
const T & read_back ()
 
void write (const T &item)
 
template<typename IT >
void write (IT const a, IT const b)
 
bool is_readable () const throw ()
 
bool is_writable () const throw ()
 
memory_size_type block_size () const
 
memory_size_type block_items () const
 
template<typename TT >
void read_user_data (TT &data)
 
memory_size_type read_user_data (void *data, memory_size_type count)
 
template<typename TT >
void write_user_data (const TT &data)
 
void write_user_data (const void *data, memory_size_type count)
 
memory_size_type user_data_size () const
 
memory_size_type max_user_data_size () const
 
const std::string & path () const
 
void open (const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
 Deprecated interface for opening a named stream. More...
 
void open (memory_size_type userDataSize, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
 Deprecated interface for opening an unnamed temporary stream. More...
 
void open (temp_file &file, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
 Deprecated interface for opening a temporary stream. More...
 
void open (const std::string &path, compression_flags compressionFlags)
 Deprecated interface for opening a named stream. More...
 
void open (compression_flags compressionFlags)
 Deprecated interface for opening an unnamed temporary stream. More...
 
void open (temp_file &file, compression_flags compressionFlags)
 Deprecated interface for opening a temporary stream. More...
 
void open (const std::string &path, open::type openFlags=open::defaults, memory_size_type userDataSize=0)
 Open and possibly create a stream. More...
 
void open (open::type openFlags=open::defaults, memory_size_type userDataSize=0)
 Open and create an unnamed temporary stream. More...
 
void open (temp_file &file, open::type openFlags=open::defaults, memory_size_type userDataSize=0)
 Open and possibly create a temporary stream. More...
 
void close ()
 
bool is_open () const
 
stream_size_type size () const
 
stream_size_type file_size () const
 
stream_size_type offset () const
 

Static Public Member Functions

static memory_size_type memory_usage (double blockFactor=1.0)
 
static memory_size_type block_size (double blockFactor) throw ()
 
static double calculate_block_factor (memory_size_type blockSize) throw ()
 
static memory_size_type block_memory_usage (double blockFactor)
 

Static Public Attributes

static const
file_stream_base::offset_type 
beginning = file_stream_base::beginning
 
static const
file_stream_base::offset_type 
end = file_stream_base::end
 
static const
file_stream_base::offset_type 
current = file_stream_base::current
 

Protected Member Functions

void open_inner (const std::string &path, open::type openFlags, memory_size_type userDataSize)
 
compressor_threadcompressor ()
 
void finish_requests (compressor_thread_lock &l)
 
stream_size_type last_block_read_offset (compressor_thread_lock &l)
 Blocks to take the compressor lock. More...
 
stream_size_type current_file_size (compressor_thread_lock &l)
 Blocks to take the compressor lock. More...
 
bool use_compression ()
 
void uncache_read_writes ()
 Reset cheap read/write counts to zero so that the next read/write operation will check stream state properly. More...
 

Protected Attributes

bool m_bufferDirty
 Whether the current block must be written out to disk before being ejected. More...
 
memory_size_type m_blockItems
 Number of items in a logical block. More...
 
memory_size_type m_blockSize
 Size (in bytes) of a logical (uncompressed) block. More...
 
bool m_canRead
 Whether we are open for reading. More...
 
bool m_canWrite
 Whether we are open for writing. More...
 
bool m_open
 Whether we are open. More...
 
memory_size_type m_itemSize
 Size of a single item. More...
 
memory_size_type m_cachedReads
 Number of cheap, unchecked reads we can do next. More...
 
memory_size_type m_cachedWrites
 Number of cheap, unchecked writes we can do next. More...
 
tpie::unique_ptr< temp_filem_ownedTempFile
 The anonymous temporary file we have opened (when appropriate). More...
 
temp_filem_tempFile
 The temporary file we have opened (when appropriate). More...
 
file_accessor::byte_stream_accessor
< default_raw_file_accessor
m_byteStreamAccessor
 File accessor. More...
 
stream_size_type m_size
 Number of logical items in the stream. More...
 
stream_buffers m_buffers
 Buffer manager for this entire stream. More...
 
buffer_t m_buffer
 Buffer holding the items of the block currently being read/written. More...
 
stream_size_type m_streamBlocks
 The number of blocks written to the file. More...
 
stream_size_type m_lastBlockReadOffset
 When use_compression() is true: Read offset of the last block in the stream. More...
 
stream_size_type m_currentFileSize
 
compressor_response m_response
 Response from compressor thread; protected by compressor thread mutex. More...
 
bool m_updateReadOffsetFromWrite = false
 When use_compression() is true: Indicates whether m_response is the response to a write request. More...
 
stream_size_type m_lastWriteBlockNumber
 
seek_state::type m_seekState
 
stream_size_type m_readOffset
 Position relating to the currently loaded buffer. More...
 
stream_size_type m_offset
 Offset of next item to read/write, relative to beginning of stream. More...
 
stream_position m_nextPosition
 If seekState is position, seek to this position before reading/writing. More...
 
stream_size_type m_nextReadOffset
 

Detailed Description

template<typename T>
class tpie::file_stream< T >

Compressed stream.

We assume that T is trivially copyable and that its copy constructor and assignment operator never throws.

As a rule of thumb, when a tpie::stream_exception is thrown from a method, the stream is left in the state it was in prior to the method call. When a tpie::exception is thrown, the stream may have changed. In particular, the stream may have been closed, and it is up to the caller (if the exception is caught) to ensure that the stream is reopened as necessary.

Several methods claim the nothrow guarantee even though the implementation has throw statements. In this case, there are two reasons an exception may be thrown: A tpie::exception is thrown if some invariant in the stream has been violated, and this is a bug we must fix in the compressed stream. A tpie::stream_exception is thrown if the user has violated a precondition (for instance by passing an invalid parameter).

Definition at line 46 of file predeclare.h.

Member Function Documentation

template<typename T>
bool tpie::file_stream< T >::can_read ( )
inline

Check if the next call to read() will succeed or not.

Definition at line 1010 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe(), tpie::pipelining::bits::input_t< dest_t >::go(), tpie::pipelining::bits::buffer_output_t< dest_t >::go(), and tpie::ami::merge_sorted_runs().

1010  {
1011  if (m_cachedReads > 0)
1012  return true;
1013 
1014  if (!this->m_open)
1015  return false;
1016 
1017  return offset() < size();
1018  }
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Definition: stream.h:392
bool m_open
Whether we are open.
Definition: stream.h:388
template<typename T>
bool tpie::file_stream< T >::can_read_back ( )
inline

Check if the next call to read_back() will succeed or not.

Definition at line 1023 of file stream.h.

1023  {
1024  if (!this->m_open)
1025  return false;
1026 
1027  return offset() > 0;
1028  }
bool m_open
Whether we are open.
Definition: stream.h:388
stream_size_type tpie::compressed_stream_base::current_file_size ( compressor_thread_lock l)
protectedinherited

Blocks to take the compressor lock.

Precondition: use_compression()

TODO: Should probably investigate when this reports a useful value.

Referenced by tpie::file_stream< tpie::stream_position >::get_position().

template<typename T>
void tpie::file_stream< T >::describe ( std::ostream &  out)
inline

For debugging: Describe the internal stream state in a string.

Definition at line 556 of file stream.h.

556  {
557  if (!this->is_open()) {
558  out << "[Closed stream]";
559  return;
560  }
561 
562  out << "[(" << m_byteStreamAccessor.path() << ") item " << offset()
563  << " of " << size();
564  out << " (block " << block_number()
565  << " @ byte " << m_readOffset
566  << ", item " << block_item_index()
567  << ")";
568 
569  if (use_compression()) {
570  out << ", compressed";
571  } else {
572  out << ", uncompressed";
573  }
574 
575  switch (m_seekState) {
576  case seek_state::none:
577  break;
578  case seek_state::beginning:
579  out << ", seeking to beginning";
580  break;
581  case seek_state::end:
582  out << ", seeking to end";
583  break;
584  case seek_state::position:
585  out << ", seeking to position " << m_nextPosition.offset();
586  out << " (block " << block_number(m_nextPosition.offset())
587  << " @ byte " << m_nextPosition.read_offset()
588  << ", item " << block_item_index(m_nextPosition.offset())
589  << ")";
590  break;
591  }
592 
593  if (m_bufferDirty)
594  out << " dirty";
595 
596  if (m_seekState == seek_state::none) {
597  if (can_read()) out << ", can read";
598  else out << ", cannot read";
599  }
600 
601  out << ", " << m_streamBlocks << " blocks";
602  if (m_lastBlockReadOffset != std::numeric_limits<stream_size_type>::max())
603  out << ", last block at " << m_lastBlockReadOffset;
604  if (m_currentFileSize != std::numeric_limits<stream_size_type>::max())
605  out << ", current file size " << m_currentFileSize;
606 
607  out << ']';
608  }
stream_position m_nextPosition
If seekState is position, seek to this position before reading/writing.
Definition: stream.h:456
bool m_bufferDirty
Whether the current block must be written out to disk before being ejected.
Definition: stream.h:378
stream_size_type m_readOffset
Position relating to the currently loaded buffer.
Definition: stream.h:438
bool can_read()
Check if the next call to read() will succeed or not.
Definition: stream.h:1010
file_accessor::byte_stream_accessor< default_raw_file_accessor > m_byteStreamAccessor
File accessor.
Definition: stream.h:401
stream_size_type offset() const
The stream offset of the item pointed to.
stream_size_type m_lastBlockReadOffset
When use_compression() is true: Read offset of the last block in the stream.
Definition: stream.h:416
stream_size_type m_streamBlocks
The number of blocks written to the file.
Definition: stream.h:411
template<typename T>
std::string tpie::file_stream< T >::describe ( )
inline

For debugging: Describe the internal stream state in a string.

Definition at line 613 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe().

613  {
614  std::stringstream ss;
615  describe(ss);
616  return ss.str();
617  }
std::string describe()
For debugging: Describe the internal stream state in a string.
Definition: stream.h:613
template<typename T>
stream_position tpie::file_stream< T >::get_position ( )
inline

Store the current stream position such that it may be found later on.

The stream_position object is violated if the stream is eventually truncated to before the current position.

The stream_position objects are plain old data, so they may themselves be written to streams.

Blocks to take the compressor lock.

Definition at line 841 of file stream.h.

841  {
842  tp_assert(is_open(), "get_position: !is_open");
843  if (!use_compression()) return stream_position(0, offset());
844  switch (m_seekState) {
845  case seek_state::position:
846  // We just set_position, so we can just return what we got.
847  return m_nextPosition;
848  case seek_state::beginning:
849  return stream_position(0, 0);
850  case seek_state::none:
851  if (buffer_block_number() != m_streamBlocks) {
852  if (m_nextItem == m_bufferEnd)
853  return stream_position(m_nextReadOffset, m_offset);
854  else
855  return stream_position(m_readOffset, m_offset);
856  }
857  // We are in a new block at the end of the stream.
858  if (m_nextItem == m_bufferEnd) {
859  tp_assert(m_bufferDirty, "At end of buffer, but bufferDirty is false?");
860  // Make sure the position we get is not at the end of a block
861  compressor_thread_lock lock(compressor());
863  flush_block(lock);
864  get_buffer(lock, m_streamBlocks);
865  m_nextItem = m_bufferBegin;
866  }
867  break;
868  case seek_state::end:
869  // Figure out the size of the file below.
870  break;
871  }
872 
873  stream_size_type readOffset;
874  stream_size_type blockNumber = block_number(offset());
875  compressor_thread_lock l(compressor());
876  if (size() % m_blockItems == 0)
877  readOffset = current_file_size(l);
878  else if (blockNumber == m_streamBlocks)
879  readOffset = current_file_size(l);
880  else if (blockNumber == m_streamBlocks - 1)
881  readOffset = last_block_read_offset(l);
882  else {
883  tp_assert(false, "get_position: Invalid block_number");
884  readOffset = 1111111111111111111ull; // avoid compiler warning
885  }
886  return stream_position(readOffset, offset());
887  }
stream_position m_nextPosition
If seekState is position, seek to this position before reading/writing.
Definition: stream.h:456
bool m_bufferDirty
Whether the current block must be written out to disk before being ejected.
Definition: stream.h:378
stream_size_type m_readOffset
Position relating to the currently loaded buffer.
Definition: stream.h:438
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:453
memory_size_type m_blockItems
Number of items in a logical block.
Definition: stream.h:380
stream_size_type current_file_size(compressor_thread_lock &l)
Blocks to take the compressor lock.
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
stream_size_type last_block_read_offset(compressor_thread_lock &l)
Blocks to take the compressor lock.
bool m_updateReadOffsetFromWrite
When use_compression() is true: Indicates whether m_response is the response to a write request...
Definition: stream.h:426
stream_size_type m_streamBlocks
The number of blocks written to the file.
Definition: stream.h:411
stream_size_type tpie::compressed_stream_base::last_block_read_offset ( compressor_thread_lock l)
protectedinherited

Blocks to take the compressor lock.

Precondition: use_compression()

TODO: Should probably investigate when this reports a useful value.

Referenced by tpie::file_stream< tpie::stream_position >::get_position().

void tpie::compressed_stream_base::open ( const std::string &  path,
access_type  accessType,
memory_size_type  userDataSize = 0,
cache_hint  cacheHint = access_sequential,
compression_flags  compressionFlags = compression_none 
)
inlineinherited

Deprecated interface for opening a named stream.

If compressionFlags is compression_none and the file does not already exist, no compression will be used when writing. If compressionFlags is compression_normal and the file does not already exist, compression will be used when writing. If the file already exists, the compression flags of the existing file are used instead.

Definition at line 204 of file stream.h.

Referenced by tpie::pipelining::bits::named_output_t< T >::begin(), tpie::compressed_stream_base::open(), and tpie::pipelining::bits::named_pull_input_t< T >::propagate().

209  {
210  open(path, open::translate(accessType, cacheHint, compressionFlags), userDataSize);
211  }
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
void tpie::compressed_stream_base::open ( memory_size_type  userDataSize,
cache_hint  cacheHint = access_sequential,
compression_flags  compressionFlags = compression_none 
)
inlineinherited

Deprecated interface for opening an unnamed temporary stream.

Definition at line 216 of file stream.h.

References tpie::access_read_write, and tpie::compressed_stream_base::open().

218  {
219  open(open::translate(access_read_write, cacheHint, compressionFlags), userDataSize);
220  }
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
Open a file for reading or writing.
Definition: access_type.h:35
void tpie::compressed_stream_base::open ( temp_file file,
access_type  accessType,
memory_size_type  userDataSize = 0,
cache_hint  cacheHint = access_sequential,
compression_flags  compressionFlags = compression_none 
)
inlineinherited

Deprecated interface for opening a temporary stream.

Definition at line 225 of file stream.h.

References tpie::compressed_stream_base::open().

229  {
230  open(file, open::translate(accessType, cacheHint, compressionFlags), userDataSize);
231  }
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
void tpie::compressed_stream_base::open ( const std::string &  path,
compression_flags  compressionFlags 
)
inlineinherited

Deprecated interface for opening a named stream.

Definition at line 236 of file stream.h.

References tpie::access_read_write, tpie::access_sequential, and tpie::compressed_stream_base::open().

236  {
237  const memory_size_type userDataSize = 0;
238  open(path, open::translate(access_read_write, access_sequential, compressionFlags), userDataSize);
239  }
Sequential access is intended.
Definition: cache_hint.h:36
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
Open a file for reading or writing.
Definition: access_type.h:35
void tpie::compressed_stream_base::open ( compression_flags  compressionFlags)
inlineinherited

Deprecated interface for opening an unnamed temporary stream.

Definition at line 244 of file stream.h.

References tpie::access_read_write, tpie::access_sequential, and tpie::compressed_stream_base::open().

244  {
245  const memory_size_type userDataSize = 0;
246  open(open::translate(access_read_write, access_sequential, compressionFlags), userDataSize);
247  }
Sequential access is intended.
Definition: cache_hint.h:36
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
Open a file for reading or writing.
Definition: access_type.h:35
void tpie::compressed_stream_base::open ( temp_file file,
compression_flags  compressionFlags 
)
inlineinherited

Deprecated interface for opening a temporary stream.

Definition at line 252 of file stream.h.

References tpie::access_read_write, tpie::access_sequential, and tpie::compressed_stream_base::open().

252  {
253  const memory_size_type userDataSize = 0;
254  open(file, open::translate(access_read_write, access_sequential, compressionFlags), userDataSize);
255  }
Sequential access is intended.
Definition: cache_hint.h:36
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
Open a file for reading or writing.
Definition: access_type.h:35
void tpie::compressed_stream_base::open ( const std::string &  path,
open::type  openFlags = open::defaults,
memory_size_type  userDataSize = 0 
)
inherited

Open and possibly create a stream.

The stream is created if it does not exist and opened for reading and writing, but this can be changed with open::read_only or open::write_only; see below.

The flags supplied to openFlags should be a combination of the following from open::type, OR'ed together:

open::read_only Open for reading only, and fail if the stream does not exist.

open::write_only Open for writing only, and truncate the stream if it exists.

open::access_normal By default, POSIX_FADV_SEQUENTIAL is passed to the open syscall to indicate that the OS should optimize for sequential access; this flag disables that flag.

open::access_random Pass POSIX_FADV_RANDOM to the open syscall to make the OS optimize for random access.

open::compression_normal Create the stream in compression mode if it does not already exist, and compress written blocks according to available resources (for instance CPU time and memory).

open::compression_all Create the stream in compression mode if it does not already exist, and compress all written blocks using the preferred compression scheme, which can be set using tpie::the_compressor_thread().set_preferred_compression().

Parameters
pathThe path to the file to open
openFlagsA bit-wise combination of the flags; see above.
userDataSizeRequired user data capacity in stream header.
void tpie::compressed_stream_base::open ( open::type  openFlags = open::defaults,
memory_size_type  userDataSize = 0 
)
inherited

Open and create an unnamed temporary stream.

Parameters
openFlagsA bit-wise combination of the flags; see above.
userDataSizeRequired user data capacity in stream header.
void tpie::compressed_stream_base::open ( temp_file file,
open::type  openFlags = open::defaults,
memory_size_type  userDataSize = 0 
)
inherited

Open and possibly create a temporary stream.

Parameters
fileThe temporary file to open
openFlagsA bit-wise combination of the flags; see above.
userDataSizeRequired user data capacity in stream header.
template<typename T>
const T& tpie::file_stream< T >::peek ( )
inline

Peeks next item from stream if can_read() == true.

If can_read() == false, throws an end_of_stream_exception.

Blocks to take the compressor lock.

If a stream_exception is thrown, the stream is left in the state it was in before the call to peek().

Definition at line 970 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::read().

970  {
971  if (m_cachedReads > 0) {
972  return *m_nextItem;
973  }
974  if (m_seekState != seek_state::none) perform_seek();
975  if (m_offset == m_size) throw end_of_stream_exception();
976  if (m_nextItem == m_bufferEnd) {
977  compressor_thread_lock l(compressor());
978  if (this->m_bufferDirty) {
980  flush_block(l);
981  }
982  // At this point, block_number() == buffer_block_number() + 1
983  read_next_block(l, block_number());
984  }
985  return *m_nextItem;
986  }
bool m_bufferDirty
Whether the current block must be written out to disk before being ejected.
Definition: stream.h:378
stream_size_type m_size
Number of logical items in the stream.
Definition: stream.h:403
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:453
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Definition: stream.h:392
bool m_updateReadOffsetFromWrite
When use_compression() is true: Indicates whether m_response is the response to a write request...
Definition: stream.h:426
template<typename T>
const T& tpie::file_stream< T >::read ( )
inline

Reads next item from stream if can_read() == true.

If can_read() == false, throws an end_of_stream_exception.

Blocks to take the compressor lock.

If a stream_exception is thrown, the stream is left in the state it was in before the call to read().

Definition at line 947 of file stream.h.

Referenced by tpie::pipelining::bits::input_t< dest_t >::go(), tpie::pipelining::bits::buffer_output_t< dest_t >::go(), tpie::ami::merge_sorted_runs(), tpie::file_stream< tpie::stream_position >::read(), tpie::sort_manager< T, I, M >::sort(), and tpie::ami::Internal_Sorter_Obj< T, Compare >::sort().

947  {
948  if (m_cachedReads > 0) {
949  --m_cachedReads;
950  ++m_offset;
951  return *m_nextItem++;
952  }
953  const T & res = peek();
954  ++m_offset;
955  ++m_nextItem;
956  cache_read_writes();
957  return res;
958  }
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:453
const T & peek()
Peeks next item from stream if can_read() == true.
Definition: stream.h:970
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Definition: stream.h:392
template<typename T>
template<typename IT >
void tpie::file_stream< T >::read ( IT const  a,
IT const  b 
)
inline

Precondition: is_open().

Reads min(b-a, size()-offset()) items into the range [a, b). If less than b-a items are read, throws an end_of_stream_exception.

Definition at line 1003 of file stream.h.

1003  {
1004  for (IT i = a; i != b; ++i) *i = read();
1005  }
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:947
template<typename T>
void tpie::file_stream< T >::seek ( stream_offset_type  offset,
offset_type  whence = beginning 
)
inline

Precondition: is_open() Precondition: offset == 0.

Definition at line 627 of file stream.h.

Referenced by tpie::pipelining::bits::buffer_output_t< dest_t >::go(), tpie::pipelining::bits::input_t< dest_t >::propagate(), tpie::pipelining::bits::pull_input_t< T >::propagate(), tpie::pipelining::bits::pull_reverse_input_t< T >::propagate(), tpie::file_stream< tpie::stream_position >::set_position(), tpie::sort_manager< T, I, M >::sort(), and tpie::ami::Internal_Sorter_Obj< T, Compare >::sort().

627  {
628  tp_assert(is_open(), "seek: !is_open");
631  if (!use_compression()) {
632  // Handle uncompressed case by delegating to set_position.
633  switch (whence) {
634  case beginning:
635  break;
636  case end:
637  offset += size();
638  break;
639  case current:
640  offset += this->offset();
641  break;
642  }
643  set_position(stream_position(0, offset));
644  return;
645  }
646  // Otherwise, we are in a compressed stream.
647  if (offset != 0) throw stream_exception("Random seeks are not supported");
648  switch (whence) {
649  case beginning:
650  if (m_buffer.get() != 0 && buffer_block_number() == 0) {
651  // We are already reading or writing the first block.
652  m_nextItem = m_bufferBegin;
653  m_offset = m_readOffset = 0;
654  m_seekState = seek_state::none;
655  } else {
656  // We need to load the first block on the next I/O.
657  m_seekState = seek_state::beginning;
658  }
659  return;
660  case end:
661  if (m_buffer.get() == 0) {
662  m_seekState = seek_state::end;
663  } else if (m_offset == size()) {
664  // no-op
665  m_seekState = seek_state::none;
666  } else if (// We are in the last block, and it has NOT YET been written to disk, or
667  buffer_block_number() == m_streamBlocks ||
668  // we are in the last block, and it has ALREADY been written to disk.
669  buffer_block_number()+1 == m_streamBlocks)
670  {
671  // If the last block is full,
672  // block_item_index() reports 0 when it should report m_blockItems.
673  // Compute blockItemIndex manually to handle this edge case.
674  stream_size_type blockItemIndex =
675  size() - buffer_block_number() * m_blockItems;
676  memory_size_type cast = static_cast<memory_size_type>(blockItemIndex);
677  tp_assert(blockItemIndex == cast, "seek: blockItemIndex out of bounds");
678  m_nextItem = m_bufferBegin + cast;
679 
680  m_offset = size();
681  m_seekState = seek_state::none;
682  } else {
683  m_seekState = seek_state::end;
684  }
685  return;
686  case current:
687  return;
688  }
689  tp_assert(false, "seek: Unknown whence");
690  }
void set_position(const stream_position &pos)
Seek to a position that was previously recalled with get_position.
Definition: stream.h:893
stream_size_type m_readOffset
Position relating to the currently loaded buffer.
Definition: stream.h:438
buffer_t m_buffer
Buffer holding the items of the block currently being read/written.
Definition: stream.h:407
void uncache_read_writes()
Reset cheap read/write counts to zero so that the next read/write operation will check stream state p...
Definition: stream.h:345
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:453
memory_size_type m_blockItems
Number of items in a logical block.
Definition: stream.h:380
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
bool m_updateReadOffsetFromWrite
When use_compression() is true: Indicates whether m_response is the response to a write request...
Definition: stream.h:426
stream_size_type m_streamBlocks
The number of blocks written to the file.
Definition: stream.h:411
template<typename T>
void tpie::file_stream< T >::set_position ( const stream_position pos)
inline

Seek to a position that was previously recalled with get_position.

Definition at line 893 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::seek().

893  {
895 
896  // If the code is correct, short circuiting is not necessary;
897  // if the code is not correct, short circuiting might mask faults.
898  /*
899  if (pos == m_position) {
900  m_seekState = seek_state::none;
901  return;
902  }
903  */
904 
905  if (pos == stream_position::end()) {
906  seek(0, end);
907  return;
908  }
909 
910  if (!use_compression() && pos.read_offset() != 0)
911  throw stream_exception("set_position: Invalid position, read_offset != 0");
912 
913  if (pos.offset() > size())
914  throw stream_exception("set_position: Invalid position, offset > size");
915 
916  if (m_buffer.get() != 0
917  && block_number(pos.offset()) == buffer_block_number())
918  {
919  if (pos.read_offset() != m_readOffset) {
920  // We don't always know the read offset of the current block
921  // in m_readOffset, so let's assume that
922  // pos.read_offset() is correct.
923  }
924 
925  m_readOffset = pos.read_offset();
926  m_offset = pos.offset();
927  m_nextItem = m_bufferBegin + block_item_index();
928  m_seekState = seek_state::none;
929  return;
930  }
931 
932  m_nextPosition = pos;
933  m_seekState = seek_state::position;
935  }
stream_position m_nextPosition
If seekState is position, seek to this position before reading/writing.
Definition: stream.h:456
stream_size_type m_readOffset
Position relating to the currently loaded buffer.
Definition: stream.h:438
buffer_t m_buffer
Buffer holding the items of the block currently being read/written.
Definition: stream.h:407
void uncache_read_writes()
Reset cheap read/write counts to zero so that the next read/write operation will check stream state p...
Definition: stream.h:345
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
Definition: stream.h:627
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:453
static stream_position end()
Special-value constructor returning a pointer to the end.
bool m_updateReadOffsetFromWrite
When use_compression() is true: Indicates whether m_response is the response to a write request...
Definition: stream.h:426
template<typename T>
void tpie::file_stream< T >::truncate ( stream_size_type  offset)
inline

Truncate to given size.

Precondition: compression is disabled or offset is size() or 0. Blocks to take the compressor lock.

Definition at line 698 of file stream.h.

Referenced by tpie::ami::Internal_Sorter_Obj< T, Compare >::sort().

698  {
699  tp_assert(is_open(), "truncate: !is_open");
701  if (offset == size())
702  return;
703  else if (offset == 0)
704  truncate_zero();
705  else if (!use_compression())
706  truncate_uncompressed(offset);
707  else
708  throw stream_exception("Arbitrary truncate is not supported");
709 
710  if (m_tempFile) m_tempFile->update_recorded_size(m_size);
711  }
stream_size_type m_size
Number of logical items in the stream.
Definition: stream.h:403
void uncache_read_writes()
Reset cheap read/write counts to zero so that the next read/write operation will check stream state p...
Definition: stream.h:345
temp_file * m_tempFile
The temporary file we have opened (when appropriate).
Definition: stream.h:399
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
template<typename T>
void tpie::file_stream< T >::truncate ( const stream_position pos)
inline

Truncate to given stream position.

Definition at line 716 of file stream.h.

716  {
717  tp_assert(is_open(), "truncate: !is_open");
719  if (pos.offset() == size())
720  return;
721  else if (pos.offset() == 0)
722  truncate_zero();
723  else if (!use_compression())
724  truncate_uncompressed(pos.offset());
725  else
726  truncate_compressed(pos);
727 
728  if (m_tempFile) m_tempFile->update_recorded_size(m_size);
729  }
stream_size_type m_size
Number of logical items in the stream.
Definition: stream.h:403
void uncache_read_writes()
Reset cheap read/write counts to zero so that the next read/write operation will check stream state p...
Definition: stream.h:345
temp_file * m_tempFile
The temporary file we have opened (when appropriate).
Definition: stream.h:399
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
void tpie::compressed_stream_base::uncache_read_writes ( )
inlineprotectedinherited

Reset cheap read/write counts to zero so that the next read/write operation will check stream state properly.

Definition at line 345 of file stream.h.

References tpie::compressed_stream_base::m_cachedReads, and tpie::compressed_stream_base::m_cachedWrites.

Referenced by tpie::file_stream< tpie::stream_position >::seek(), tpie::file_stream< tpie::stream_position >::set_position(), and tpie::file_stream< tpie::stream_position >::truncate().

345  {
347  }
memory_size_type m_cachedWrites
Number of cheap, unchecked writes we can do next.
Definition: stream.h:394
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Definition: stream.h:392

Member Data Documentation

memory_size_type tpie::compressed_stream_base::m_blockItems
protectedinherited

Number of items in a logical block.

Definition at line 380 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::get_position(), and tpie::file_stream< tpie::stream_position >::seek().

memory_size_type tpie::compressed_stream_base::m_blockSize
protectedinherited

Size (in bytes) of a logical (uncompressed) block.

Definition at line 382 of file stream.h.

buffer_t tpie::compressed_stream_base::m_buffer
protectedinherited

Buffer holding the items of the block currently being read/written.

Definition at line 407 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::seek(), and tpie::file_stream< tpie::stream_position >::set_position().

bool tpie::compressed_stream_base::m_bufferDirty
protectedinherited

Whether the current block must be written out to disk before being ejected.

Invariants: If m_bufferDirty is true and use_compression() is true, block_number() is either m_streamBlocks or m_streamBlocks - 1. If block_number() is m_streamBlocks, m_bufferDirty is true.

Definition at line 378 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe(), tpie::file_stream< tpie::stream_position >::get_position(), and tpie::file_stream< tpie::stream_position >::peek().

stream_buffers tpie::compressed_stream_base::m_buffers
protectedinherited

Buffer manager for this entire stream.

Definition at line 405 of file stream.h.

file_accessor::byte_stream_accessor<default_raw_file_accessor> tpie::compressed_stream_base::m_byteStreamAccessor
protectedinherited

File accessor.

Definition at line 401 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe().

memory_size_type tpie::compressed_stream_base::m_cachedReads
protectedinherited
memory_size_type tpie::compressed_stream_base::m_cachedWrites
protectedinherited

Number of cheap, unchecked writes we can do next.

Definition at line 394 of file stream.h.

Referenced by tpie::compressed_stream_base::uncache_read_writes().

bool tpie::compressed_stream_base::m_canRead
protectedinherited

Whether we are open for reading.

Definition at line 384 of file stream.h.

bool tpie::compressed_stream_base::m_canWrite
protectedinherited

Whether we are open for writing.

Definition at line 386 of file stream.h.

memory_size_type tpie::compressed_stream_base::m_itemSize
protectedinherited

Size of a single item.

itemSize * blockItems == blockSize.

Definition at line 390 of file stream.h.

stream_size_type tpie::compressed_stream_base::m_lastBlockReadOffset
protectedinherited

When use_compression() is true: Read offset of the last block in the stream.

Necessary to support seeking to the end.

Definition at line 416 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe().

stream_position tpie::compressed_stream_base::m_nextPosition
protectedinherited

If seekState is position, seek to this position before reading/writing.

Definition at line 456 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe(), tpie::file_stream< tpie::stream_position >::get_position(), and tpie::file_stream< tpie::stream_position >::set_position().

stream_size_type tpie::compressed_stream_base::m_offset
protectedinherited

Offset of next item to read/write, relative to beginning of stream.

Invariants:

block_number() in [0, m_streamBlocks] offset in [0, size] block_item_index() in [0, m_blockSize) offset == block_number() * m_blockItems + block_item_index()

block_item_index() <= offset.

If block_number() == m_streamBlocks, we are in a block that has not yet been written to disk.

Definition at line 453 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::get_position(), tpie::file_stream< tpie::stream_position >::peek(), tpie::file_stream< tpie::stream_position >::read(), tpie::file_stream< tpie::stream_position >::seek(), and tpie::file_stream< tpie::stream_position >::set_position().

bool tpie::compressed_stream_base::m_open
protectedinherited
tpie::unique_ptr<temp_file> tpie::compressed_stream_base::m_ownedTempFile
protectedinherited

The anonymous temporary file we have opened (when appropriate).

Definition at line 396 of file stream.h.

stream_size_type tpie::compressed_stream_base::m_readOffset
protectedinherited

Position relating to the currently loaded buffer.

readOffset is only valid during reading. Invariants:

If use_compression() == false, readOffset == 0. If offset == 0, then readOffset == block_item_index() == block_number() == 0.

Definition at line 438 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe(), tpie::file_stream< tpie::stream_position >::get_position(), tpie::file_stream< tpie::stream_position >::seek(), and tpie::file_stream< tpie::stream_position >::set_position().

compressor_response tpie::compressed_stream_base::m_response
protectedinherited

Response from compressor thread; protected by compressor thread mutex.

Definition at line 420 of file stream.h.

stream_size_type tpie::compressed_stream_base::m_size
protectedinherited

Number of logical items in the stream.

Definition at line 403 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::peek(), and tpie::file_stream< tpie::stream_position >::truncate().

stream_size_type tpie::compressed_stream_base::m_streamBlocks
protectedinherited

The number of blocks written to the file.

We must always have (m_streamBlocks+1) * m_blockItems <= m_size.

Definition at line 411 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::describe(), tpie::file_stream< tpie::stream_position >::get_position(), and tpie::file_stream< tpie::stream_position >::seek().

temp_file* tpie::compressed_stream_base::m_tempFile
protectedinherited

The temporary file we have opened (when appropriate).

When m_ownedTempFile.get() != 0, m_tempFile == m_ownedTempFile.get().

Definition at line 399 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::truncate().

bool tpie::compressed_stream_base::m_updateReadOffsetFromWrite = false
protectedinherited

When use_compression() is true: Indicates whether m_response is the response to a write request.

Used for knowing where to read next in read/read_back.

Definition at line 426 of file stream.h.

Referenced by tpie::file_stream< tpie::stream_position >::get_position(), tpie::file_stream< tpie::stream_position >::peek(), tpie::file_stream< tpie::stream_position >::seek(), and tpie::file_stream< tpie::stream_position >::set_position().


The documentation for this class was generated from the following files: