00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "ThrowingFwrite.h"
00023 #include "BlockHashIndex.h"
00024 #include "CmphStringVectorAdapter.h"
00025 #include "util/exception.hh"
00026 #include "util/string_stream.hh"
00027
00028 #ifdef HAVE_CMPH
00029 #include "cmph.h"
00030 #endif
00031
00032 namespace Moses
00033 {
00034 #ifdef WITH_THREADS
00035 BlockHashIndex::BlockHashIndex(size_t orderBits, size_t fingerPrintBits,
00036 size_t threadsNum)
00037 : m_orderBits(orderBits), m_fingerPrintBits(fingerPrintBits),
00038 m_fileHandle(0), m_fileHandleStart(0), m_landmarks(true), m_size(0),
00039 m_lastSaved(-1), m_lastDropped(-1), m_numLoadedRanges(0),
00040 m_threadPool(threadsNum)
00041 {
00042 #ifndef HAVE_CMPH
00043 std::cerr << "minphr: CMPH support not compiled in." << std::endl;
00044 exit(1);
00045 #endif
00046 }
00047 #else
00048 BlockHashIndex::BlockHashIndex(size_t orderBits, size_t fingerPrintBits)
00049 : m_orderBits(orderBits), m_fingerPrintBits(fingerPrintBits),
00050 m_fileHandle(0), m_fileHandleStart(0), m_size(0),
00051 m_lastSaved(-1), m_lastDropped(-1), m_numLoadedRanges(0)
00052 {
00053 #ifndef HAVE_CMPH
00054 std::cerr << "minphr: CMPH support not compiled in." << std::endl;
00055 exit(1);
00056 #endif
00057 }
00058 #endif
00059
00060 BlockHashIndex::~BlockHashIndex()
00061 {
00062 #ifdef HAVE_CMPH
00063 for(std::vector<void*>::iterator it = m_hashes.begin();
00064 it != m_hashes.end(); it++)
00065 if(*it != 0)
00066 cmph_destroy((cmph_t*)*it);
00067
00068 for(std::vector<PairedPackedArray<>*>::iterator it = m_arrays.begin();
00069 it != m_arrays.end(); it++)
00070 if(*it != 0)
00071 delete *it;
00072 #endif
00073 }
00074
00075 size_t BlockHashIndex::GetHash(const char* key)
00076 {
00077 std::string keyStr(key);
00078 size_t i = std::distance(m_landmarks.begin(),
00079 std::upper_bound(m_landmarks.begin(),
00080 m_landmarks.end(), keyStr)) - 1;
00081
00082 if(i == 0ul-1)
00083 return GetSize();
00084
00085 size_t pos = GetHash(i, key);
00086 if(pos != GetSize())
00087 return (1ul << m_orderBits) * i + pos;
00088 else
00089 return GetSize();
00090 }
00091
00092 size_t BlockHashIndex::GetFprint(const char* key) const
00093 {
00094 size_t hash;
00095 MurmurHash3_x86_32(key, std::strlen(key), 100000, &hash);
00096 hash &= (1ul << m_fingerPrintBits) - 1;
00097 return hash;
00098 }
00099
00100 size_t BlockHashIndex::GetHash(size_t i, const char* key)
00101 {
00102
00103
00104
00105
00106
00107 #ifdef HAVE_CMPH
00108 size_t idx = cmph_search((cmph_t*)m_hashes[i], key, (cmph_uint32) strlen(key));
00109 #else
00110 assert(0);
00111 size_t idx = 0;
00112 #endif
00113
00114 std::pair<size_t, size_t> orderPrint = m_arrays[i]->Get(idx, m_orderBits, m_fingerPrintBits);
00115 m_clocks[i] = clock();
00116
00117 if(GetFprint(key) == orderPrint.second)
00118 return orderPrint.first;
00119 else
00120 return GetSize();
00121 }
00122
00123 size_t BlockHashIndex::GetHash(std::string key)
00124 {
00125 return GetHash(key.c_str());
00126 }
00127
00128 size_t BlockHashIndex::operator[](std::string key)
00129 {
00130 return GetHash(key);
00131 }
00132
00133 size_t BlockHashIndex::operator[](char* key)
00134 {
00135 return GetHash(key);
00136 }
00137
00138 size_t BlockHashIndex::Save(std::string filename)
00139 {
00140 std::FILE* mphf = std::fopen(filename.c_str(), "w");
00141 size_t size = Save(mphf);
00142 std::fclose(mphf);
00143 return size;
00144 }
00145
00146 void BlockHashIndex::BeginSave(std::FILE * mphf)
00147 {
00148 m_fileHandle = mphf;
00149 ThrowingFwrite(&m_orderBits, sizeof(size_t), 1, m_fileHandle);
00150 ThrowingFwrite(&m_fingerPrintBits, sizeof(size_t), 1, m_fileHandle);
00151
00152 m_fileHandleStart = std::ftell(m_fileHandle);
00153
00154 size_t relIndexPos = 0;
00155 ThrowingFwrite(&relIndexPos, sizeof(size_t), 1, m_fileHandle);
00156 }
00157
00158 void BlockHashIndex::SaveRange(size_t i)
00159 {
00160 #ifdef HAVE_CMPH
00161 if(m_seekIndex.size() <= i)
00162 m_seekIndex.resize(i+1);
00163 m_seekIndex[i] = std::ftell(m_fileHandle) - m_fileHandleStart;
00164 cmph_dump((cmph_t*)m_hashes[i], m_fileHandle);
00165 m_arrays[i]->Save(m_fileHandle);
00166 #endif
00167 }
00168
00169 void BlockHashIndex::SaveLastRange()
00170 {
00171 #ifdef WITH_THREADS
00172 boost::mutex::scoped_lock lock(m_mutex);
00173 #endif
00174
00175 while(!m_queue.empty() && m_lastSaved + 1 == -m_queue.top()) {
00176 size_t current = -m_queue.top();
00177 m_queue.pop();
00178 SaveRange(current);
00179 m_lastSaved = current;
00180 }
00181 }
00182
00183 void BlockHashIndex::DropRange(size_t i)
00184 {
00185 #ifdef HAVE_CMPH
00186 if(m_hashes[i] != 0) {
00187 cmph_destroy((cmph_t*)m_hashes[i]);
00188 m_hashes[i] = 0;
00189 }
00190 if(m_arrays[i] != 0) {
00191 delete m_arrays[i];
00192 m_arrays[i] = 0;
00193 m_clocks[i] = 0;
00194 }
00195 m_numLoadedRanges--;
00196 #endif
00197 }
00198
00199 void BlockHashIndex::DropLastRange()
00200 {
00201 #ifdef WITH_THREADS
00202 boost::mutex::scoped_lock lock(m_mutex);
00203 #endif
00204
00205 while(m_lastDropped != m_lastSaved)
00206 DropRange(++m_lastDropped);
00207 }
00208
00209 #ifdef WITH_THREADS
00210 void BlockHashIndex::WaitAll()
00211 {
00212 m_threadPool.Stop(true);
00213 }
00214 #endif
00215
00216 size_t BlockHashIndex::FinalizeSave()
00217 {
00218 #ifdef WITH_THREADS
00219 m_threadPool.Stop(true);
00220 #endif
00221
00222 SaveLastRange();
00223
00224 size_t relIndexPos = std::ftell(m_fileHandle) - m_fileHandleStart;
00225
00226 std::fseek(m_fileHandle, m_fileHandleStart, SEEK_SET);
00227 ThrowingFwrite(&relIndexPos, sizeof(size_t), 1, m_fileHandle);
00228
00229 std::fseek(m_fileHandle, m_fileHandleStart + relIndexPos, SEEK_SET);
00230 m_landmarks.save(m_fileHandle);
00231
00232 size_t seekIndexSize = m_seekIndex.size();
00233 ThrowingFwrite(&seekIndexSize, sizeof(size_t), 1, m_fileHandle);
00234 ThrowingFwrite(&m_seekIndex[0], sizeof(size_t), seekIndexSize, m_fileHandle);
00235
00236 ThrowingFwrite(&m_size, sizeof(size_t), 1, m_fileHandle);
00237
00238 size_t fileHandleStop = std::ftell(m_fileHandle);
00239 return fileHandleStop - m_fileHandleStart + sizeof(m_orderBits)
00240 + sizeof(m_fingerPrintBits);
00241 }
00242
00243 size_t BlockHashIndex::Save(std::FILE * mphf)
00244 {
00245 m_queue = std::priority_queue<int>();
00246 BeginSave(mphf);
00247 for(size_t i = 0; i < m_hashes.size(); i++)
00248 SaveRange(i);
00249 return FinalizeSave();
00250 }
00251
00252 size_t BlockHashIndex::LoadIndex(std::FILE* mphf)
00253 {
00254 m_fileHandle = mphf;
00255
00256 size_t beginning = std::ftell(mphf);
00257
00258 size_t read = 0;
00259 read += std::fread(&m_orderBits, sizeof(size_t), 1, mphf);
00260 read += std::fread(&m_fingerPrintBits, sizeof(size_t), 1, mphf);
00261 m_fileHandleStart = std::ftell(m_fileHandle);
00262
00263 size_t relIndexPos;
00264 read += std::fread(&relIndexPos, sizeof(size_t), 1, mphf);
00265 std::fseek(m_fileHandle, m_fileHandleStart + relIndexPos, SEEK_SET);
00266
00267 m_landmarks.load(mphf);
00268
00269 size_t seekIndexSize;
00270 read += std::fread(&seekIndexSize, sizeof(size_t), 1, m_fileHandle);
00271 m_seekIndex.resize(seekIndexSize);
00272 read += std::fread(&m_seekIndex[0], sizeof(size_t), seekIndexSize, m_fileHandle);
00273 m_hashes.resize(seekIndexSize, 0);
00274 m_clocks.resize(seekIndexSize, 0);
00275 m_arrays.resize(seekIndexSize, 0);
00276
00277 read += std::fread(&m_size, sizeof(size_t), 1, m_fileHandle);
00278
00279 size_t end = std::ftell(mphf);
00280
00281 return end - beginning;
00282 }
00283
00284 void BlockHashIndex::LoadRange(size_t i)
00285 {
00286 #ifdef HAVE_CMPH
00287 std::fseek(m_fileHandle, m_fileHandleStart + m_seekIndex[i], SEEK_SET);
00288 cmph_t* hash = cmph_load(m_fileHandle);
00289 m_arrays[i] = new PairedPackedArray<>(0, m_orderBits,
00290 m_fingerPrintBits);
00291 m_arrays[i]->Load(m_fileHandle);
00292
00293 m_hashes[i] = (void*)hash;
00294 m_clocks[i] = clock();
00295
00296 m_numLoadedRanges++;
00297 #endif
00298 }
00299
00300 size_t BlockHashIndex::Load(std::string filename)
00301 {
00302 std::FILE* mphf = std::fopen(filename.c_str(), "r");
00303 size_t size = Load(mphf);
00304 std::fclose(mphf);
00305 return size;
00306 }
00307
00308 size_t BlockHashIndex::Load(std::FILE * mphf)
00309 {
00310 size_t byteSize = LoadIndex(mphf);
00311 size_t end = std::ftell(mphf);
00312
00313 for(size_t i = 0; i < m_seekIndex.size(); i++)
00314 LoadRange(i);
00315 std::fseek(m_fileHandle, end, SEEK_SET);
00316 return byteSize;
00317 }
00318
00319 size_t BlockHashIndex::GetSize() const
00320 {
00321 return m_size;
00322 }
00323
00324 void BlockHashIndex::KeepNLastRanges(float ratio, float tolerance)
00325 {
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344 }
00345
00346 void BlockHashIndex::CalcHash(size_t current, void* source_void)
00347 {
00348 #ifdef HAVE_CMPH
00349 cmph_io_adapter_t* source = (cmph_io_adapter_t*) source_void;
00350 cmph_config_t *config = cmph_config_new(source);
00351 cmph_config_set_algo(config, CMPH_CHD);
00352
00353 cmph_t* hash = cmph_new(config);
00354 PairedPackedArray<> *pv =
00355 new PairedPackedArray<>(source->nkeys, m_orderBits, m_fingerPrintBits);
00356
00357 size_t i = 0;
00358
00359 source->rewind(source->data);
00360
00361 std::string lastKey = "";
00362 while(i < source->nkeys) {
00363 unsigned keylen;
00364 char* key;
00365 source->read(source->data, &key, &keylen);
00366 std::string temp(key, keylen);
00367 source->dispose(source->data, key, keylen);
00368
00369 if(lastKey > temp) {
00370 if(source->nkeys != 2 || temp != "###DUMMY_KEY###") {
00371 util::StringStream strme;
00372 strme << "ERROR: Input file does not appear to be sorted with LC_ALL=C sort\n";
00373 strme << "1: " << lastKey << "\n";
00374 strme << "2: " << temp << "\n";
00375 UTIL_THROW2(strme.str());
00376 }
00377 }
00378 lastKey = temp;
00379
00380 size_t fprint = GetFprint(temp.c_str());
00381 size_t idx = cmph_search(hash, temp.c_str(),
00382 (cmph_uint32) temp.size());
00383
00384 pv->Set(idx, i, fprint, m_orderBits, m_fingerPrintBits);
00385 i++;
00386 }
00387
00388 cmph_config_destroy(config);
00389
00390 #ifdef WITH_THREADS
00391 boost::mutex::scoped_lock lock(m_mutex);
00392 #endif
00393
00394 if(m_hashes.size() <= current) {
00395 m_hashes.resize(current + 1, 0);
00396 m_arrays.resize(current + 1, 0);
00397 m_clocks.resize(current + 1, 0);
00398 }
00399
00400 m_hashes[current] = (void*)hash;
00401 m_arrays[current] = pv;
00402 m_clocks[current] = clock();
00403 m_queue.push(-current);
00404 #endif
00405 }
00406
00407 #ifdef HAVE_CMPH
00408 void* BlockHashIndex::vectorAdapter(std::vector<std::string>& v)
00409 {
00410 return (void*)CmphVectorAdapter(v);
00411 }
00412
00413 void* BlockHashIndex::vectorAdapter(StringVector<unsigned, size_t, std::allocator>& sv)
00414 {
00415 return (void*)CmphStringVectorAdapter(sv);
00416 }
00417
00418 void* BlockHashIndex::vectorAdapter(StringVector<unsigned, size_t, MmapAllocator>& sv)
00419 {
00420 return (void*)CmphStringVectorAdapter(sv);
00421 }
00422 #endif
00423
00424 }