00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef UTIL_STREAM_SORT_H
00019 #define UTIL_STREAM_SORT_H
00020
00021 #include "util/stream/chain.hh"
00022 #include "util/stream/config.hh"
00023 #include "util/stream/io.hh"
00024 #include "util/stream/stream.hh"
00025 #include "util/stream/timer.hh"
00026
00027 #include "util/file.hh"
00028 #include "util/fixed_array.hh"
00029 #include "util/scoped.hh"
00030 #include "util/sized_iterator.hh"
00031
00032 #include <algorithm>
00033 #include <iostream>
00034 #include <queue>
00035 #include <string>
00036
00037 namespace util {
00038 namespace stream {
00039
00040 struct NeverCombine {
00041 template <class Compare> bool operator()(const void *, const void *, const Compare &) const {
00042 return false;
00043 }
00044 };
00045
00046
00047 class Offsets {
00048 public:
00049 explicit Offsets(int fd) : log_(fd) {
00050 Reset();
00051 }
00052
00053 int File() const { return log_; }
00054
00055 void Append(uint64_t length) {
00056 if (!length) return;
00057 ++block_count_;
00058 if (length == cur_.length) {
00059 ++cur_.run;
00060 return;
00061 }
00062 WriteOrThrow(log_, &cur_, sizeof(Entry));
00063 cur_.length = length;
00064 cur_.run = 1;
00065 }
00066
00067 void FinishedAppending() {
00068 WriteOrThrow(log_, &cur_, sizeof(Entry));
00069 SeekOrThrow(log_, sizeof(Entry));
00070 cur_.run = 0;
00071 if (block_count_) {
00072 ReadOrThrow(log_, &cur_, sizeof(Entry));
00073 assert(cur_.length);
00074 assert(cur_.run);
00075 }
00076 }
00077
00078 uint64_t RemainingBlocks() const { return block_count_; }
00079
00080 uint64_t TotalOffset() const { return output_sum_; }
00081
00082 uint64_t PeekSize() const {
00083 return cur_.length;
00084 }
00085
00086 uint64_t NextSize() {
00087 assert(block_count_);
00088 uint64_t ret = cur_.length;
00089 output_sum_ += ret;
00090
00091 --cur_.run;
00092 --block_count_;
00093 if (!cur_.run && block_count_) {
00094 ReadOrThrow(log_, &cur_, sizeof(Entry));
00095 assert(cur_.length);
00096 assert(cur_.run);
00097 }
00098 return ret;
00099 }
00100
00101 void Reset() {
00102 SeekOrThrow(log_, 0);
00103 ResizeOrThrow(log_, 0);
00104 cur_.length = 0;
00105 cur_.run = 0;
00106 block_count_ = 0;
00107 output_sum_ = 0;
00108 }
00109
00110 private:
00111 int log_;
00112
00113 struct Entry {
00114 uint64_t length;
00115 uint64_t run;
00116 };
00117 Entry cur_;
00118
00119 uint64_t block_count_;
00120
00121 uint64_t output_sum_;
00122 };
00123
00124
00125 template <class Compare> class MergeQueue {
00126 public:
00127 MergeQueue(int fd, std::size_t buffer_size, std::size_t entry_size, const Compare &compare)
00128 : queue_(Greater(compare)), in_(fd), buffer_size_(buffer_size), entry_size_(entry_size) {}
00129
00130 void Push(void *base, uint64_t offset, uint64_t amount) {
00131 queue_.push(Entry(base, in_, offset, amount, buffer_size_));
00132 }
00133
00134 const void *Top() const {
00135 return queue_.top().Current();
00136 }
00137
00138 void Pop() {
00139 Entry top(queue_.top());
00140 queue_.pop();
00141 if (top.Increment(in_, buffer_size_, entry_size_))
00142 queue_.push(top);
00143 }
00144
00145 std::size_t Size() const {
00146 return queue_.size();
00147 }
00148
00149 bool Empty() const {
00150 return queue_.empty();
00151 }
00152
00153 private:
00154
00155 class Entry {
00156 public:
00157 Entry() {}
00158
00159 Entry(void *base, int fd, uint64_t offset, uint64_t amount, std::size_t buf_size) {
00160 offset_ = offset;
00161 remaining_ = amount;
00162 buffer_end_ = static_cast<uint8_t*>(base) + buf_size;
00163 Read(fd, buf_size);
00164 }
00165
00166 bool Increment(int fd, std::size_t buf_size, std::size_t entry_size) {
00167 current_ += entry_size;
00168 if (current_ != buffer_end_) return true;
00169 return Read(fd, buf_size);
00170 }
00171
00172 const void *Current() const { return current_; }
00173
00174 private:
00175 bool Read(int fd, std::size_t buf_size) {
00176 current_ = buffer_end_ - buf_size;
00177 std::size_t amount;
00178 if (static_cast<uint64_t>(buf_size) < remaining_) {
00179 amount = buf_size;
00180 } else if (!remaining_) {
00181 return false;
00182 } else {
00183 amount = remaining_;
00184 buffer_end_ = current_ + remaining_;
00185 }
00186 ErsatzPRead(fd, current_, amount, offset_);
00187 offset_ += amount;
00188 assert(current_ <= buffer_end_);
00189 remaining_ -= amount;
00190 return true;
00191 }
00192
00193
00194 uint8_t *current_, *buffer_end_;
00195
00196 uint64_t remaining_, offset_;
00197 };
00198
00199
00200 class Greater : public std::binary_function<const Entry &, const Entry &, bool> {
00201 public:
00202 explicit Greater(const Compare &compare) : compare_(compare) {}
00203
00204 bool operator()(const Entry &first, const Entry &second) const {
00205 return compare_(second.Current(), first.Current());
00206 }
00207
00208 private:
00209 const Compare compare_;
00210 };
00211
00212 typedef std::priority_queue<Entry, std::vector<Entry>, Greater> Queue;
00213 Queue queue_;
00214
00215 const int in_;
00216 const std::size_t buffer_size_;
00217 const std::size_t entry_size_;
00218 };
00219
00220
00221
00222
00223
00224
00225
00226 template <class Compare, class Combine> class MergingReader {
00227 public:
00228 MergingReader(int in, Offsets *in_offsets, Offsets *out_offsets, std::size_t buffer_size, std::size_t total_memory, const Compare &compare, const Combine &combine) :
00229 compare_(compare), combine_(combine),
00230 in_(in),
00231 in_offsets_(in_offsets), out_offsets_(out_offsets),
00232 buffer_size_(buffer_size), total_memory_(total_memory) {}
00233
00234 void Run(const ChainPosition &position) {
00235 Run(position, false);
00236 }
00237
00238 void Run(const ChainPosition &position, bool assert_one) {
00239
00240 if (!in_offsets_->RemainingBlocks()) {
00241 Link l(position);
00242 l.Poison();
00243 return;
00244 }
00245
00246 if (in_offsets_->RemainingBlocks() == 1) {
00247
00248 uint64_t offset = in_offsets_->TotalOffset();
00249 uint64_t amount = in_offsets_->NextSize();
00250 ReadSingle(offset, amount, position);
00251 if (out_offsets_) out_offsets_->Append(amount);
00252 return;
00253 }
00254
00255 Stream str(position);
00256 scoped_malloc buffer(MallocOrThrow(total_memory_));
00257 uint8_t *const buffer_end = static_cast<uint8_t*>(buffer.get()) + total_memory_;
00258
00259 const std::size_t entry_size = position.GetChain().EntrySize();
00260
00261 while (in_offsets_->RemainingBlocks()) {
00262
00263 uint64_t per_buffer = static_cast<uint64_t>(std::max<std::size_t>(
00264 buffer_size_,
00265 static_cast<std::size_t>((static_cast<uint64_t>(total_memory_) / in_offsets_->RemainingBlocks()))));
00266 per_buffer -= per_buffer % entry_size;
00267 assert(per_buffer);
00268
00269
00270 MergeQueue<Compare> queue(in_, per_buffer, entry_size, compare_);
00271 for (uint8_t *buf = static_cast<uint8_t*>(buffer.get());
00272 in_offsets_->RemainingBlocks() && (buf + std::min(per_buffer, in_offsets_->PeekSize()) <= buffer_end);) {
00273 uint64_t offset = in_offsets_->TotalOffset();
00274 uint64_t size = in_offsets_->NextSize();
00275 queue.Push(buf, offset, size);
00276 buf += static_cast<std::size_t>(std::min<uint64_t>(size, per_buffer));
00277 }
00278
00279 if (queue.Size() < 2 && in_offsets_->RemainingBlocks()) {
00280 std::cerr << "Bug in sort implementation: not merging at least two stripes." << std::endl;
00281 abort();
00282 }
00283 if (assert_one && in_offsets_->RemainingBlocks()) {
00284 std::cerr << "Bug in sort implementation: should only be one merge group for lazy sort" << std::endl;
00285 abort();
00286 }
00287
00288 uint64_t written = 0;
00289
00290 memcpy(str.Get(), queue.Top(), entry_size);
00291 for (queue.Pop(); !queue.Empty(); queue.Pop()) {
00292 if (!combine_(str.Get(), queue.Top(), compare_)) {
00293 ++written; ++str;
00294 memcpy(str.Get(), queue.Top(), entry_size);
00295 }
00296 }
00297 ++written; ++str;
00298 if (out_offsets_)
00299 out_offsets_->Append(written * entry_size);
00300 }
00301 str.Poison();
00302 }
00303
00304 private:
00305 void ReadSingle(uint64_t offset, const uint64_t size, const ChainPosition &position) {
00306
00307 const uint64_t end = offset + size;
00308 const uint64_t block_size = position.GetChain().BlockSize();
00309 Link l(position);
00310 for (; offset + block_size < end; ++l, offset += block_size) {
00311 ErsatzPRead(in_, l->Get(), block_size, offset);
00312 l->SetValidSize(block_size);
00313 }
00314 ErsatzPRead(in_, l->Get(), end - offset, offset);
00315 l->SetValidSize(end - offset);
00316 (++l).Poison();
00317 return;
00318 }
00319
00320 Compare compare_;
00321 Combine combine_;
00322
00323 int in_;
00324
00325 protected:
00326 Offsets *in_offsets_;
00327
00328 private:
00329 Offsets *out_offsets_;
00330
00331 std::size_t buffer_size_;
00332 std::size_t total_memory_;
00333 };
00334
00335
00336 template <class Compare, class Combine> class OwningMergingReader : public MergingReader<Compare, Combine> {
00337 private:
00338 typedef MergingReader<Compare, Combine> P;
00339 public:
00340 OwningMergingReader(int data, const Offsets &offsets, std::size_t buffer, std::size_t lazy, const Compare &compare, const Combine &combine)
00341 : P(data, NULL, NULL, buffer, lazy, compare, combine),
00342 data_(data),
00343 offsets_(offsets) {}
00344
00345 void Run(const ChainPosition &position) {
00346 P::in_offsets_ = &offsets_;
00347 scoped_fd data(data_);
00348 scoped_fd offsets_file(offsets_.File());
00349 P::Run(position, true);
00350 }
00351
00352 private:
00353 int data_;
00354 Offsets offsets_;
00355 };
00356
00357
00358 template <class Compare> class BlockSorter {
00359 public:
00360 BlockSorter(Offsets &offsets, const Compare &compare) :
00361 offsets_(&offsets), compare_(compare) {}
00362
00363 void Run(const ChainPosition &position) {
00364 const std::size_t entry_size = position.GetChain().EntrySize();
00365 for (Link link(position); link; ++link) {
00366
00367 offsets_->Append(link->ValidSize());
00368 void *end = static_cast<uint8_t*>(link->Get()) + link->ValidSize();
00369 #if defined(_WIN32) || defined(_WIN64)
00370 std::stable_sort
00371 #else
00372 std::sort
00373 #endif
00374 (SizedIt(link->Get(), entry_size),
00375 SizedIt(end, entry_size),
00376 compare_);
00377 }
00378 offsets_->FinishedAppending();
00379 }
00380
00381 private:
00382 Offsets *offsets_;
00383 SizedCompare<Compare> compare_;
00384 };
00385
00386 class BadSortConfig : public Exception {
00387 public:
00388 BadSortConfig() throw() {}
00389 ~BadSortConfig() throw() {}
00390 };
00391
00393 template <class Compare, class Combine = NeverCombine> class Sort {
00394 public:
00396 Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine())
00397 : config_(config),
00398 data_(MakeTemp(config.temp_prefix)),
00399 offsets_file_(MakeTemp(config.temp_prefix)), offsets_(offsets_file_.get()),
00400 compare_(compare), combine_(combine),
00401 entry_size_(in.EntrySize()) {
00402 UTIL_THROW_IF(!entry_size_, BadSortConfig, "Sorting entries of size 0");
00403
00404 config_.buffer_size -= config_.buffer_size % entry_size_;
00405 UTIL_THROW_IF(!config_.buffer_size, BadSortConfig, "Sort buffer too small");
00406 UTIL_THROW_IF(config_.total_memory < config_.buffer_size * 4, BadSortConfig, "Sorting memory " << config_.total_memory << " is too small for four buffers (two read and two write).");
00407 in >> BlockSorter<Compare>(offsets_, compare_) >> WriteAndRecycle(data_.get());
00408 }
00409
00410 uint64_t Size() const {
00411 return SizeOrThrow(data_.get());
00412 }
00413
00414
00415
00416 std::size_t Merge(std::size_t lazy_memory) {
00417 if (offsets_.RemainingBlocks() <= 1) return 0;
00418 const uint64_t lazy_arity = std::max<uint64_t>(1, lazy_memory / config_.buffer_size);
00419 uint64_t size = Size();
00420
00421
00422
00423
00424 if (offsets_.RemainingBlocks() <= lazy_arity || size <= static_cast<uint64_t>(lazy_memory))
00425 return std::min<std::size_t>(size, offsets_.RemainingBlocks() * config_.buffer_size);
00426
00427 scoped_fd data2(MakeTemp(config_.temp_prefix));
00428 int fd_in = data_.get(), fd_out = data2.get();
00429 scoped_fd offsets2_file(MakeTemp(config_.temp_prefix));
00430 Offsets offsets2(offsets2_file.get());
00431 Offsets *offsets_in = &offsets_, *offsets_out = &offsets2;
00432
00433
00434 ChainConfig chain_config;
00435 chain_config.entry_size = entry_size_;
00436 chain_config.block_count = 2;
00437 chain_config.total_memory = config_.buffer_size * 2;
00438 Chain chain(chain_config);
00439
00440 while (offsets_in->RemainingBlocks() > lazy_arity) {
00441 if (size <= static_cast<uint64_t>(lazy_memory)) break;
00442 std::size_t reading_memory = config_.total_memory - 2 * config_.buffer_size;
00443 if (size < static_cast<uint64_t>(reading_memory)) {
00444 reading_memory = static_cast<std::size_t>(size);
00445 }
00446 SeekOrThrow(fd_in, 0);
00447 chain >>
00448 MergingReader<Compare, Combine>(
00449 fd_in,
00450 offsets_in, offsets_out,
00451 config_.buffer_size,
00452 reading_memory,
00453 compare_, combine_) >>
00454 WriteAndRecycle(fd_out);
00455 chain.Wait();
00456 offsets_out->FinishedAppending();
00457 ResizeOrThrow(fd_in, 0);
00458 offsets_in->Reset();
00459 std::swap(fd_in, fd_out);
00460 std::swap(offsets_in, offsets_out);
00461 size = SizeOrThrow(fd_in);
00462 }
00463
00464 SeekOrThrow(fd_in, 0);
00465 if (fd_in == data2.get()) {
00466 data_.reset(data2.release());
00467 offsets_file_.reset(offsets2_file.release());
00468 offsets_ = offsets2;
00469 }
00470 if (offsets_.RemainingBlocks() <= 1) return 0;
00471
00472 return std::min(size, offsets_.RemainingBlocks() * static_cast<uint64_t>(config_.buffer_size));
00473 }
00474
00475
00476
00477 void Output(Chain &out, std::size_t lazy_memory) {
00478 Merge(lazy_memory);
00479 out.SetProgressTarget(Size());
00480 out >> OwningMergingReader<Compare, Combine>(data_.get(), offsets_, config_.buffer_size, lazy_memory, compare_, combine_);
00481 data_.release();
00482 offsets_file_.release();
00483 }
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507 std::size_t DefaultLazy() {
00508 float arity = static_cast<float>(config_.total_memory / config_.buffer_size);
00509 return static_cast<std::size_t>(static_cast<float>(config_.total_memory) * (arity - 1.0) / arity);
00510 }
00511
00512
00513 void Output(Chain &out) {
00514 Output(out, DefaultLazy());
00515 }
00516
00517
00518 int StealCompleted() {
00519
00520 Merge(0);
00521 SeekOrThrow(data_.get(), 0);
00522 offsets_file_.reset();
00523 return data_.release();
00524 }
00525
00526 private:
00527 SortConfig config_;
00528
00529 scoped_fd data_;
00530
00531 scoped_fd offsets_file_;
00532 Offsets offsets_;
00533
00534 const Compare compare_;
00535 const Combine combine_;
00536 const std::size_t entry_size_;
00537 };
00538
00539
00540 template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = NeverCombine()) {
00541 Sort<Compare, Combine> sorter(chain, config, compare, combine);
00542 chain.Wait(true);
00543 uint64_t size = sorter.Size();
00544 sorter.Output(chain);
00545 return size;
00546 }
00547
00557 template <class Compare, class Combine = NeverCombine> class Sorts : public FixedArray<Sort<Compare, Combine> > {
00558 private:
00559 typedef Sort<Compare, Combine> S;
00560 typedef FixedArray<S> P;
00561
00562 public:
00570 Sorts() {}
00571
00578 explicit Sorts(std::size_t number) : FixedArray<Sort<Compare, Combine> >(number) {}
00579
00590 void push_back(util::stream::Chain &chain, const util::stream::SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) {
00591 new (P::end()) S(chain, config, compare, combine);
00592 P::Constructed();
00593 }
00594 };
00595
00596 }
00597 }
00598
00599 #endif // UTIL_STREAM_SORT_H