Moses
statistical
machine translation
system

Multi-Threading

Modern server machines have multiple CPUs, each with multiple cores. Utilizing all these cores requires either running multiple processes on the same machine or writing programs that use multiple threads.

Since many aspects of a machine translation system (training, tuning, using) lend themselves very easily to parallel processing, Moses increasingly uses multi-threading in its components. At this point, the following components allow for parallel execution when the switch "--threads NUM" is added with an appropriate maximum number of threads executed at the same time:

  • the decoder binary moses
  • the minimum error rate training tuner mert
  • the hierarchical rule extractor extract-rules

Multi-threading in Moses is based on the C++ Boost libraries, and two Moses helper libraries that make the type of multi-threading that is typical for Moses more convenient: ThreadPool and OutputCollector.

We will explain the implementation of multi-threaded processing on hand of a simple example.

Tasks

The part of the program that is to be run in parallel threads is called a task, and it needs to be placed into a class of its own.

 class ExampleTask : public Moses::Task
 {
 public:
   ExampleTask() {}
   ~ExampleTask() {}

   void Run() {
     std::cout << "Hello World." << endl;
   }
 }

Such a class now allows to be instantiated and run:

 ExampleTask *task = new ExampleTask()
 new->Run();
 delete(new);

This will print "Hello World.", and is otherwise not very exciting.

Let's make the task a bit more interesting. Our new tasks waits for a random amount of time and then prints out a message:

 ExampleTask(string message):m_message(message) {}

 void Run() {
   // length of pause
   int r = rand()%10;

   // pause
   int j = 0;
   for(int i=0; i<1e8*r; i++) { j+=i; }

   // write message (and length of pause)
   std::cout << m_message << " (" << r << ")" << endl;
 }

We can now create multiple instances of this task, and execute each:

 // set up tasks
 srand(time(NULL));
 ExampleTask *task0 = new ExampleTask("zero");
 ExampleTask *task1 = new ExampleTask("one");
 ExampleTask *task2 = new ExampleTask("two");

 // serial execution
 task0->Run();
 task1->Run();
 task2->Run();

This will print out three lines (the random numbers in parenthesis will vary):

 zero (2)
 one (4)
 two (5)

Okay, where is the multi-threading? Here it comes.

ThreadPool

Instead of simply running one of the tasks after the other, we assign them to a thread pool. Once assigned, they are spawned off to a thread and will be executed in parallel to the running main process.

 // set up thread pool
 int thread_count = 10;
 Moses::ThreadPool pool(thread_count);

 // submit tasks
 pool.Submit(task0);
 pool.Submit(task1);
 pool.Submit(task2);

 // wait for all threads to finish
 pool.Stop(true);

That's all too easy to be true, right? Yes, it is.

Since the three threads are running in parallel, there is no telling when they print out their message. Not only could the lines be printed in a different order then the tasks were scheduled, the threads may even write all over each other.

This is the catch with multi-threading: any interaction with non-local data structures must be handled very carefully. Ideally, threads only change local data (defined in the class), and once they are done (after pool.Stop(true)), results can be read out. This is in fact what happens in multi-threaded mert.

In our case, as in the decoder, we want to output text line by line (the decoder outputs translation, and possibly additional information such as n-best lists).

OutputCollector

The Moses code offers the class OutputCollector to buffer up the output until it is safe to print out. In the simplest case, it prints to STDOUT, but it can also write to a file, and indeed it offers both regular output (default STDOUT) and debugging output (default STDERR), which both can be redirected to different files.

 Moses::OutputCollector* outputCollector = new Moses::OutputCollector();

A task can then send its output to the output collector with the function Write, for example:

 m_collector->Write(id, "Hello World!");

The id is the sequential number of the sentence, starting at 0. This helps the output collector to keep track of what can be written out and what needs to be buffered. The output collector will not write output for sentence 1, if it has not yet received output for sentence 0.

Not Deleting Threads after Execution

By default, the Task objects are deleted after execution. However, you may want to keep the objects around. This happens for instance in mert, where each Task finds an optimized weight setting, which is to be processed afterwards. In this case, you have to add the following lines to your Task definition:

 virtual bool DeleteAfterExecution() {
   return false;
 }

Limit the Size of the Thread Queue

By default, when a thread is submitted to the ThreadPool by calling its Submit() function, it is added to an internal queue, and the main process immediately resumes. That means, if a million threads are scheduled, the thread queue is filled with a million instances of the Task, which may consume a lot of memory.

If you want to restrict the number of threads in the queue, you can call, say, pool.SetQueueLimit(1000) to limit it to 1000 queued Task instances. When the queue is full, Submit() blocks.

Example

Below now the complete example.

Note:

  • The task class has now two more class variables which are set upon instantiation: the sequence id m_id (a sequential number starting at 0), and a pointer to the output collector m_collector.
  • Always implement a fallback to non-threaded compilation (#ifdef WITH_THREADS .. #else .. #endif)
  • Output is placed into a file named output-file.txt (lines 43-45) instead of STDOUT.
 01: #include <iostream>
 02: #include <fstream>
 03: #include <ostream>
 04: #include <cstdlib>
 05: #include <sstream>
 06: #include "ThreadPool.h"
 07: #include "OutputCollector.h"
 08: 
 09: using namespace std;
 10:
 11: class ExampleTask : public Moses::Task
 12: {
 13: private:
 14:   unsigned int m_id;
 15:   string m_message;
 16:   Moses::OutputCollector* m_collector;
 17: public:
 18:   ExampleTask(unsigned int id, string message, Moses::OutputCollector* collector):
 19:     m_id(id),
 20:     m_message(message),
 21:     m_collector(collector) {}
 22: 
 23:   ~ExampleTask() {}
 24: 
 25:   void Run() {
 26:     // length of pause
 27:     int r = rand()%10;
 28: 
 29:     // pause
 30:     int j = 0;
 31:     for(int i=0; i<1e8*r; i++) { j+=i; }
 32: 
 33:     // write message (and length of pause)
 34:     ostringstream out;
 35:     out << m_message << " (" << r << ")" << endl;
 36:     m_collector->Write(m_id, out.str());
 37:   }
 38: };
 39: 
 40: int main ()
 41: {
 42:   // output into file
 43:   string outfile = "output-file.txt";
 44:   std::ofstream *outputStream = new ofstream(outfile.c_str());
 45:   Moses::OutputCollector* outputCollector = new Moses::OutputCollector(outputStream);
 46: 
 47:   // set up tasks
 48:   srand(time(NULL));
 49:   ExampleTask *task0 = new ExampleTask(0,"zero",outputCollector);
 50:   ExampleTask *task1 = new ExampleTask(1,"one",outputCollector);
 51:   ExampleTask *task2 = new ExampleTask(2,"two",outputCollector);
 52: 
 53: #ifdef WITH_THREADS
 54:   // set up thread pool
 55:   int thread_count = 10;
 56:   Moses::ThreadPool pool(thread_count);
 57: 
 58:   // submit tasks
 59:   pool.Submit(task0);
 60:   pool.Submit(task1);
 61:   pool.Submit(task2);
 62: 
 63:   // wait for all threads to finish
 64:   pool.Stop(true);
 65: #else
 66:   // fallback: serial execution
 67:   task0->Run();
 68:   task1->Run();
 69:   task2->Run();
 70: #endif
 71: }

To compile this, you need to copy ThreadPool.h, ThreadPool.cpp, and OutputCollector.h into you code directory or add paths so that they point to the moses/src directory and compile as follows:

 g++ -c ThreadPool.cpp -DWITH_THREADS -DBOOST_HAS_PTHREADS
 g++ -c test.cpp -DWITH_THREADS -DBOOST_HAS_PTHREADS
 g++ -o test test.o ThreadPool.o -pthread -lboost_thread-mt

Make sure that the Boost libraries are in you compile paths.

When you run this example you will notice that, whatever the lengths of the pauses, the output always appears in the correct order (i.e. zero, one, two).

Edit - History - Print
Page last modified on April 17, 2012, at 11:49 PM