00001 #include "util/stream/line_input.hh"
00002
00003 #include "util/exception.hh"
00004 #include "util/file.hh"
00005 #include "util/read_compressed.hh"
00006 #include "util/stream/chain.hh"
00007
00008 #include <algorithm>
00009 #include <vector>
00010
00011 namespace util { namespace stream {
00012
00013 void LineInput::Run(const ChainPosition &position) {
00014 ReadCompressed reader(fd_);
00015
00016 std::vector<char> carry;
00017
00018 for (Link block(position); ; ++block) {
00019 char *to = static_cast<char*>(block->Get());
00020 char *begin = to;
00021 char *end = to + position.GetChain().BlockSize();
00022 std::copy(carry.begin(), carry.end(), to);
00023 to += carry.size();
00024 while (to != end) {
00025 std::size_t got = reader.Read(to, end - to);
00026 if (!got) {
00027
00028 block->SetValidSize(to - begin);
00029 ++block;
00030 block.Poison();
00031 return;
00032 }
00033 to += got;
00034 }
00035
00036
00037 char *newline;
00038 for (newline = to - 1; ; --newline) {
00039 UTIL_THROW_IF(newline < begin, Exception, "Did not find a newline in " << position.GetChain().BlockSize() << " bytes of input of " << NameFromFD(fd_) << ". Is this a text file?");
00040 if (*newline == '\n') break;
00041 }
00042
00043
00044 carry.clear();
00045 carry.resize(to - (newline + 1));
00046 std::copy(newline + 1, to, &*carry.begin());
00047
00048 block->SetValidSize(newline + 1 - begin);
00049 }
00050 }
00051
00052 }}