00001 #include "util/parallel_read.hh"
00002
00003 #include "util/file.hh"
00004
00005 #ifdef WITH_THREADS
00006 #include "util/thread_pool.hh"
00007
00008 namespace util {
00009 namespace {
00010
00011 class Reader {
00012 public:
00013 explicit Reader(int fd) : fd_(fd) {}
00014
00015 struct Request {
00016 void *to;
00017 std::size_t size;
00018 uint64_t offset;
00019
00020 bool operator==(const Request &other) const {
00021 return (to == other.to) && (size == other.size) && (offset == other.offset);
00022 }
00023 };
00024
00025 void operator()(const Request &request) {
00026 util::ErsatzPRead(fd_, request.to, request.size, request.offset);
00027 }
00028
00029 private:
00030 int fd_;
00031 };
00032
00033 }
00034
00035 void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
00036 Reader::Request poison;
00037 poison.to = NULL;
00038 poison.size = 0;
00039 poison.offset = 0;
00040 unsigned threads = boost::thread::hardware_concurrency();
00041 if (!threads) threads = 2;
00042 ThreadPool<Reader> pool(2 , threads, fd, poison);
00043 const std::size_t kBatch = 1ULL << 25;
00044 Reader::Request request;
00045 request.to = to;
00046 request.size = kBatch;
00047 request.offset = offset;
00048 for (; amount > kBatch; amount -= kBatch) {
00049 pool.Produce(request);
00050 request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
00051 request.offset += kBatch;
00052 }
00053 request.size = amount;
00054 if (request.size) {
00055 pool.Produce(request);
00056 }
00057 }
00058
00059 }
00060
00061 #else // WITH_THREADS
00062
00063 namespace util {
00064 void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
00065 util::ErsatzPRead(fd, to, amount, offset);
00066 }
00067 }
00068
00069 #endif