00001 #ifndef UTIL_STREAM_REWINDABLE_STREAM_H
00002 #define UTIL_STREAM_REWINDABLE_STREAM_H
00003
00004 #include "util/stream/chain.hh"
00005
00006 #include <boost/noncopyable.hpp>
00007
00008 #include <deque>
00009
00010 namespace util {
00011 namespace stream {
00012
00020 class RewindableStream : boost::noncopyable {
00021 public:
00026 RewindableStream();
00027
00028 ~RewindableStream() {
00029 Poison();
00030 }
00031
00039 void Init(const ChainPosition &position);
00040
00047 explicit RewindableStream(const ChainPosition &position)
00048 : in_(NULL) {
00049 Init(position);
00050 }
00051
00055 const void *Get() const {
00056 assert(!poisoned_);
00057 assert(current_);
00058 return current_;
00059 }
00060
00064 void *Get() {
00065 assert(!poisoned_);
00066 assert(current_);
00067 return current_;
00068 }
00069
00070 operator bool() const { return !poisoned_; }
00071
00072 bool operator!() const { return poisoned_; }
00073
00078 void Mark();
00079
00084 void Rewind();
00085
00090 RewindableStream& operator++();
00091
00097 void Poison();
00098
00099 private:
00100 void AppendBlock();
00101
00102 void Flush(std::deque<Block>::iterator to);
00103
00104 std::deque<Block> blocks_;
00105
00106 std::size_t blocks_it_;
00107
00108 std::size_t entry_size_;
00109 std::size_t block_size_;
00110 std::size_t block_count_;
00111
00112 uint8_t *marked_, *current_;
00113 const uint8_t *block_end_;
00114
00115 PCQueue<Block> *in_, *out_;
00116
00117
00118 bool hit_poison_;
00119
00120 bool poisoned_;
00121
00122 WorkerProgress progress_;
00123 };
00124
00125 inline Chain &operator>>(Chain &chain, RewindableStream &stream) {
00126 stream.Init(chain.Add());
00127 return chain;
00128 }
00129
00130 }
00131 }
00132 #endif