00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef UTIL_STREAM_SORT__
00019 #define UTIL_STREAM_SORT__
00020
00021 #include "util/stream/chain.hh"
00022 #include "util/stream/config.hh"
00023 #include "util/stream/io.hh"
00024 #include "util/stream/stream.hh"
00025 #include "util/stream/timer.hh"
00026
00027 #include "util/file.hh"
00028 #include "util/scoped.hh"
00029 #include "util/sized_iterator.hh"
00030
00031 #include <algorithm>
00032 #include <iostream>
00033 #include <queue>
00034 #include <string>
00035
00036 namespace util {
00037 namespace stream {
00038
00039 struct NeverCombine {
00040 template <class Compare> bool operator()(const void *, const void *, const Compare &) const {
00041 return false;
00042 }
00043 };
00044
00045
00046 class Offsets {
00047 public:
00048 explicit Offsets(int fd) : log_(fd) {
00049 Reset();
00050 }
00051
00052 int File() const { return log_; }
00053
00054 void Append(uint64_t length) {
00055 if (!length) return;
00056 ++block_count_;
00057 if (length == cur_.length) {
00058 ++cur_.run;
00059 return;
00060 }
00061 WriteOrThrow(log_, &cur_, sizeof(Entry));
00062 cur_.length = length;
00063 cur_.run = 1;
00064 }
00065
00066 void FinishedAppending() {
00067 WriteOrThrow(log_, &cur_, sizeof(Entry));
00068 SeekOrThrow(log_, sizeof(Entry));
00069 cur_.run = 0;
00070 if (block_count_) {
00071 ReadOrThrow(log_, &cur_, sizeof(Entry));
00072 assert(cur_.length);
00073 assert(cur_.run);
00074 }
00075 }
00076
00077 uint64_t RemainingBlocks() const { return block_count_; }
00078
00079 uint64_t TotalOffset() const { return output_sum_; }
00080
00081 uint64_t PeekSize() const {
00082 return cur_.length;
00083 }
00084
00085 uint64_t NextSize() {
00086 assert(block_count_);
00087 uint64_t ret = cur_.length;
00088 output_sum_ += ret;
00089
00090 --cur_.run;
00091 --block_count_;
00092 if (!cur_.run && block_count_) {
00093 ReadOrThrow(log_, &cur_, sizeof(Entry));
00094 assert(cur_.length);
00095 assert(cur_.run);
00096 }
00097 return ret;
00098 }
00099
00100 void Reset() {
00101 SeekOrThrow(log_, 0);
00102 ResizeOrThrow(log_, 0);
00103 cur_.length = 0;
00104 cur_.run = 0;
00105 block_count_ = 0;
00106 output_sum_ = 0;
00107 }
00108
00109 private:
00110 int log_;
00111
00112 struct Entry {
00113 uint64_t length;
00114 uint64_t run;
00115 };
00116 Entry cur_;
00117
00118 uint64_t block_count_;
00119
00120 uint64_t output_sum_;
00121 };
00122
00123
00124 template <class Compare> class MergeQueue {
00125 public:
00126 MergeQueue(int fd, std::size_t buffer_size, std::size_t entry_size, const Compare &compare)
00127 : queue_(Greater(compare)), in_(fd), buffer_size_(buffer_size), entry_size_(entry_size) {}
00128
00129 void Push(void *base, uint64_t offset, uint64_t amount) {
00130 queue_.push(Entry(base, in_, offset, amount, buffer_size_));
00131 }
00132
00133 const void *Top() const {
00134 return queue_.top().Current();
00135 }
00136
00137 void Pop() {
00138 Entry top(queue_.top());
00139 queue_.pop();
00140 if (top.Increment(in_, buffer_size_, entry_size_))
00141 queue_.push(top);
00142 }
00143
00144 std::size_t Size() const {
00145 return queue_.size();
00146 }
00147
00148 bool Empty() const {
00149 return queue_.empty();
00150 }
00151
00152 private:
00153
00154 class Entry {
00155 public:
00156 Entry() {}
00157
00158 Entry(void *base, int fd, uint64_t offset, uint64_t amount, std::size_t buf_size) {
00159 offset_ = offset;
00160 remaining_ = amount;
00161 buffer_end_ = static_cast<uint8_t*>(base) + buf_size;
00162 Read(fd, buf_size);
00163 }
00164
00165 bool Increment(int fd, std::size_t buf_size, std::size_t entry_size) {
00166 current_ += entry_size;
00167 if (current_ != buffer_end_) return true;
00168 return Read(fd, buf_size);
00169 }
00170
00171 const void *Current() const { return current_; }
00172
00173 private:
00174 bool Read(int fd, std::size_t buf_size) {
00175 current_ = buffer_end_ - buf_size;
00176 std::size_t amount;
00177 if (static_cast<uint64_t>(buf_size) < remaining_) {
00178 amount = buf_size;
00179 } else if (!remaining_) {
00180 return false;
00181 } else {
00182 amount = remaining_;
00183 buffer_end_ = current_ + remaining_;
00184 }
00185 PReadOrThrow(fd, current_, amount, offset_);
00186 offset_ += amount;
00187 assert(current_ <= buffer_end_);
00188 remaining_ -= amount;
00189 return true;
00190 }
00191
00192
00193 uint8_t *current_, *buffer_end_;
00194
00195 uint64_t remaining_, offset_;
00196 };
00197
00198
00199 class Greater : public std::binary_function<const Entry &, const Entry &, bool> {
00200 public:
00201 explicit Greater(const Compare &compare) : compare_(compare) {}
00202
00203 bool operator()(const Entry &first, const Entry &second) const {
00204 return compare_(second.Current(), first.Current());
00205 }
00206
00207 private:
00208 const Compare compare_;
00209 };
00210
00211 typedef std::priority_queue<Entry, std::vector<Entry>, Greater> Queue;
00212 Queue queue_;
00213
00214 const int in_;
00215 const std::size_t buffer_size_;
00216 const std::size_t entry_size_;
00217 };
00218
00219
00220
00221
00222
00223
00224
00225 template <class Compare, class Combine> class MergingReader {
00226 public:
00227 MergingReader(int in, Offsets *in_offsets, Offsets *out_offsets, std::size_t buffer_size, std::size_t total_memory, const Compare &compare, const Combine &combine) :
00228 compare_(compare), combine_(combine),
00229 in_(in),
00230 in_offsets_(in_offsets), out_offsets_(out_offsets),
00231 buffer_size_(buffer_size), total_memory_(total_memory) {}
00232
00233 void Run(const ChainPosition &position) {
00234 Run(position, false);
00235 }
00236
00237 void Run(const ChainPosition &position, bool assert_one) {
00238
00239 if (!in_offsets_->RemainingBlocks()) {
00240 Link l(position);
00241 l.Poison();
00242 return;
00243 }
00244
00245 if (in_offsets_->RemainingBlocks() == 1) {
00246
00247 uint64_t offset = in_offsets_->TotalOffset();
00248 uint64_t amount = in_offsets_->NextSize();
00249 ReadSingle(offset, amount, position);
00250 if (out_offsets_) out_offsets_->Append(amount);
00251 return;
00252 }
00253
00254 Stream str(position);
00255 scoped_malloc buffer(MallocOrThrow(total_memory_));
00256 uint8_t *const buffer_end = static_cast<uint8_t*>(buffer.get()) + total_memory_;
00257
00258 const std::size_t entry_size = position.GetChain().EntrySize();
00259
00260 while (in_offsets_->RemainingBlocks()) {
00261
00262 uint64_t per_buffer = static_cast<uint64_t>(std::max<std::size_t>(
00263 buffer_size_,
00264 static_cast<std::size_t>((static_cast<uint64_t>(total_memory_) / in_offsets_->RemainingBlocks()))));
00265 per_buffer -= per_buffer % entry_size;
00266 assert(per_buffer);
00267
00268
00269 MergeQueue<Compare> queue(in_, per_buffer, entry_size, compare_);
00270 for (uint8_t *buf = static_cast<uint8_t*>(buffer.get());
00271 in_offsets_->RemainingBlocks() && (buf + std::min(per_buffer, in_offsets_->PeekSize()) <= buffer_end);) {
00272 uint64_t offset = in_offsets_->TotalOffset();
00273 uint64_t size = in_offsets_->NextSize();
00274 queue.Push(buf, offset, size);
00275 buf += static_cast<std::size_t>(std::min<uint64_t>(size, per_buffer));
00276 }
00277
00278 if (queue.Size() < 2 && in_offsets_->RemainingBlocks()) {
00279 std::cerr << "Bug in sort implementation: not merging at least two stripes." << std::endl;
00280 abort();
00281 }
00282 if (assert_one && in_offsets_->RemainingBlocks()) {
00283 std::cerr << "Bug in sort implementation: should only be one merge group for lazy sort" << std::endl;
00284 abort();
00285 }
00286
00287 uint64_t written = 0;
00288
00289 memcpy(str.Get(), queue.Top(), entry_size);
00290 for (queue.Pop(); !queue.Empty(); queue.Pop()) {
00291 if (!combine_(str.Get(), queue.Top(), compare_)) {
00292 ++written; ++str;
00293 memcpy(str.Get(), queue.Top(), entry_size);
00294 }
00295 }
00296 ++written; ++str;
00297 if (out_offsets_)
00298 out_offsets_->Append(written * entry_size);
00299 }
00300 str.Poison();
00301 }
00302
00303 private:
00304 void ReadSingle(uint64_t offset, const uint64_t size, const ChainPosition &position) {
00305
00306 const uint64_t end = offset + size;
00307 const uint64_t block_size = position.GetChain().BlockSize();
00308 Link l(position);
00309 for (; offset + block_size < end; ++l, offset += block_size) {
00310 PReadOrThrow(in_, l->Get(), block_size, offset);
00311 l->SetValidSize(block_size);
00312 }
00313 PReadOrThrow(in_, l->Get(), end - offset, offset);
00314 l->SetValidSize(end - offset);
00315 (++l).Poison();
00316 return;
00317 }
00318
00319 Compare compare_;
00320 Combine combine_;
00321
00322 int in_;
00323
00324 protected:
00325 Offsets *in_offsets_;
00326
00327 private:
00328 Offsets *out_offsets_;
00329
00330 std::size_t buffer_size_;
00331 std::size_t total_memory_;
00332 };
00333
00334
00335 template <class Compare, class Combine> class OwningMergingReader : public MergingReader<Compare, Combine> {
00336 private:
00337 typedef MergingReader<Compare, Combine> P;
00338 public:
00339 OwningMergingReader(int data, const Offsets &offsets, std::size_t buffer, std::size_t lazy, const Compare &compare, const Combine &combine)
00340 : P(data, NULL, NULL, buffer, lazy, compare, combine),
00341 data_(data),
00342 offsets_(offsets) {}
00343
00344 void Run(const ChainPosition &position) {
00345 P::in_offsets_ = &offsets_;
00346 scoped_fd data(data_);
00347 scoped_fd offsets_file(offsets_.File());
00348 P::Run(position, true);
00349 }
00350
00351 private:
00352 int data_;
00353 Offsets offsets_;
00354 };
00355
00356
00357 template <class Compare> class BlockSorter {
00358 public:
00359 BlockSorter(Offsets &offsets, const Compare &compare) :
00360 offsets_(&offsets), compare_(compare) {}
00361
00362 void Run(const ChainPosition &position) {
00363 const std::size_t entry_size = position.GetChain().EntrySize();
00364 for (Link link(position); link; ++link) {
00365
00366 offsets_->Append(link->ValidSize());
00367 void *end = static_cast<uint8_t*>(link->Get()) + link->ValidSize();
00368 #if defined(_WIN32) || defined(_WIN64)
00369 std::stable_sort
00370 #else
00371 std::sort
00372 #endif
00373 (SizedIt(link->Get(), entry_size),
00374 SizedIt(end, entry_size),
00375 compare_);
00376 }
00377 offsets_->FinishedAppending();
00378 }
00379
00380 private:
00381 Offsets *offsets_;
00382 SizedCompare<Compare> compare_;
00383 };
00384
00385 class BadSortConfig : public Exception {
00386 public:
00387 BadSortConfig() throw() {}
00388 ~BadSortConfig() throw() {}
00389 };
00390
00391 template <class Compare, class Combine = NeverCombine> class Sort {
00392 public:
00393 Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine())
00394 : config_(config),
00395 data_(MakeTemp(config.temp_prefix)),
00396 offsets_file_(MakeTemp(config.temp_prefix)), offsets_(offsets_file_.get()),
00397 compare_(compare), combine_(combine),
00398 entry_size_(in.EntrySize()) {
00399 UTIL_THROW_IF(!entry_size_, BadSortConfig, "Sorting entries of size 0");
00400
00401 config_.buffer_size -= config_.buffer_size % entry_size_;
00402 UTIL_THROW_IF(!config_.buffer_size, BadSortConfig, "Sort buffer too small");
00403 UTIL_THROW_IF(config_.total_memory < config_.buffer_size * 4, BadSortConfig, "Sorting memory " << config_.total_memory << " is too small for four buffers (two read and two write).");
00404 in >> BlockSorter<Compare>(offsets_, compare_) >> WriteAndRecycle(data_.get());
00405 }
00406
00407 uint64_t Size() const {
00408 return SizeOrThrow(data_.get());
00409 }
00410
00411
00412
00413 std::size_t Merge(std::size_t lazy_memory) {
00414 if (offsets_.RemainingBlocks() <= 1) return 0;
00415 const uint64_t lazy_arity = std::max<uint64_t>(1, lazy_memory / config_.buffer_size);
00416 uint64_t size = Size();
00417
00418
00419
00420
00421 if (offsets_.RemainingBlocks() <= lazy_arity || size <= static_cast<uint64_t>(lazy_memory))
00422 return std::min<std::size_t>(size, offsets_.RemainingBlocks() * config_.buffer_size);
00423
00424 scoped_fd data2(MakeTemp(config_.temp_prefix));
00425 int fd_in = data_.get(), fd_out = data2.get();
00426 scoped_fd offsets2_file(MakeTemp(config_.temp_prefix));
00427 Offsets offsets2(offsets2_file.get());
00428 Offsets *offsets_in = &offsets_, *offsets_out = &offsets2;
00429
00430
00431 ChainConfig chain_config;
00432 chain_config.entry_size = entry_size_;
00433 chain_config.block_count = 2;
00434 chain_config.total_memory = config_.buffer_size * 2;
00435 Chain chain(chain_config);
00436
00437 while (offsets_in->RemainingBlocks() > lazy_arity) {
00438 if (size <= static_cast<uint64_t>(lazy_memory)) break;
00439 std::size_t reading_memory = config_.total_memory - 2 * config_.buffer_size;
00440 if (size < static_cast<uint64_t>(reading_memory)) {
00441 reading_memory = static_cast<std::size_t>(size);
00442 }
00443 SeekOrThrow(fd_in, 0);
00444 chain >>
00445 MergingReader<Compare, Combine>(
00446 fd_in,
00447 offsets_in, offsets_out,
00448 config_.buffer_size,
00449 reading_memory,
00450 compare_, combine_) >>
00451 WriteAndRecycle(fd_out);
00452 chain.Wait();
00453 offsets_out->FinishedAppending();
00454 ResizeOrThrow(fd_in, 0);
00455 offsets_in->Reset();
00456 std::swap(fd_in, fd_out);
00457 std::swap(offsets_in, offsets_out);
00458 size = SizeOrThrow(fd_in);
00459 }
00460
00461 SeekOrThrow(fd_in, 0);
00462 if (fd_in == data2.get()) {
00463 data_.reset(data2.release());
00464 offsets_file_.reset(offsets2_file.release());
00465 offsets_ = offsets2;
00466 }
00467 if (offsets_.RemainingBlocks() <= 1) return 0;
00468
00469 return std::min(size, offsets_.RemainingBlocks() * static_cast<uint64_t>(config_.buffer_size));
00470 }
00471
00472
00473
00474 void Output(Chain &out, std::size_t lazy_memory) {
00475 Merge(lazy_memory);
00476 out.SetProgressTarget(Size());
00477 out >> OwningMergingReader<Compare, Combine>(data_.get(), offsets_, config_.buffer_size, lazy_memory, compare_, combine_);
00478 data_.release();
00479 offsets_file_.release();
00480 }
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504 std::size_t DefaultLazy() {
00505 float arity = static_cast<float>(config_.total_memory / config_.buffer_size);
00506 return static_cast<std::size_t>(static_cast<float>(config_.total_memory) * (arity - 1.0) / arity);
00507 }
00508
00509
00510 void Output(Chain &out) {
00511 Output(out, DefaultLazy());
00512 }
00513
00514
00515 int StealCompleted() {
00516
00517 Merge(0);
00518 SeekOrThrow(data_.get(), 0);
00519 offsets_file_.reset();
00520 return data_.release();
00521 }
00522
00523 private:
00524 SortConfig config_;
00525
00526 scoped_fd data_;
00527
00528 scoped_fd offsets_file_;
00529 Offsets offsets_;
00530
00531 const Compare compare_;
00532 const Combine combine_;
00533 const std::size_t entry_size_;
00534 };
00535
00536
00537 template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = NeverCombine()) {
00538 Sort<Compare, Combine> sorter(chain, config, compare, combine);
00539 chain.Wait(true);
00540 uint64_t size = sorter.Size();
00541 sorter.Output(chain);
00542 return size;
00543 }
00544
00545 }
00546 }
00547
00548 #endif // UTIL_STREAM_SORT__