Using IOCP for Worker Threads

Previous: Introduction.

The worker threads

We have a queue, we can post work units to it, now we need workers. These can be accomplished by subclassing TThread and overriding Execute, a minimal implementation could be

procedure TWorkerThread.Execute;

   procedure ExecuteAnonymousFunction(p : PAnonymousWorkUnit);
   begin
      try
         p^();
      finally
         p^._Release;
      end;
   end;

var
   lpNumberOfBytesTransferred, lpCompletionKey : NativeUInt;
   lpOverlapped : Pointer;
begin
   while not Terminated do begin
      if not GetQueuedCompletionStatus(FIOCP,
                                       lpNumberOfBytesTransferred,
                                       lpCompletionKey,
                                       lpOverlapped, INFINITE) then Break;
      if lpNumberOfBytesTransferred=1 then 
         ExecuteAnonymousFunction(PAnonymousWorkUnit(@lpOverlapped))
      else break;
   end;
end;

The worker thread is just a loop around GetQueuedCompletionStatus calls, which will have the thread wait (in the kernel) for work units, and the execute them if the command (lpNumberOfBytesTransferred) is 1.

ExecuteAnonymousFunction is where our anonymous work units are executed, and where the reference count we incremented in QueueWork is decremented.

You may have noticed that if the command is not 1, the thread is exited, this allows to reduce the size of the worker pool by one simply by posting to the queue

PostQueuedCompletionStatus(FIOCP, 0, 0, nil);

The check on GetQueuedCompletionStatus also mean that all worker threads will terminate automatically if you close the IOCP handle, the file handle or if the application terminate.

So basically, to grow the worker pool, you just create new TWorkerThread instances in a fire and forget fashion, and to reduce it, you post a zero to the queue. To clean up you just CloseHandle the FIOCP & the FFileHandle. You don’t have to keep track of the worker threads.

Going beyond bare-bones

The previous code gets the work done, but to go beyond it there are a few more things you might want to do:

  • use NameThreadForDebugging to identify your worker threads more easily in the debugger
  • wrap the Execute with CoInitializeCoUninitialize calls if you’re going to use COM in your work units
  • add support for other forms of work units, such as TNotifyEvent
  • trap work unit exceptions and either log or re-raise them in the main thread

The last point, ie. what to do with exceptions in threads can be a thorny problem though, ideally you want to surface them so they don’t go hidden, but there is little you can do that would be smart with a thread exception re-surfaced in the main thread.

I’ve found it boils down to just two options in practice:

  1. you log the exception, notify the user, and then hope for the best (ie. that the exception wasn’t serious)
  2. you resurface an unexpected exception in the main thread, and then terminate the app ASAP, because something could have gone horribly wrong and you don’t want it to result in widespread data corruption

Choose your poison 🙂

Ready-to use implementation

You can find a ready-to use implementation of an IOCP thread pool in DWScript utility units, more particularly in dwsIOCPWorkerThreadPool. It supports several other types of work units (a simple procedure, a TNotifyEvent…) as well as several counters to monitor the state of the queue.

If you test it, you’ll find that IOCP is a very efficient mechanism, as it can handle hundreds of thousandths of work units per second, and queues with millions of units. This efficiency means you can break down to smaller, simpler work units, which can help extract parallelism or just simplify coding and design.

11 thoughts on “Using IOCP for Worker Threads

  1. This is exactly what is used in the TSynThreadPool object of our class SynCrtSock, for the “pure WinSock” HTTP server (in case http.sys kernel mode is not a solution for you).

    Resulting performance is pretty good, but you are limited to a fixed number of threads, so for a HTTP server with a lot of active connections, it is less stable than the http.sys kernel mode.

    You can take a look at this unit, which is part of our mORMot framework, at http://synopse.info/fossil/finfo?name=SynCrtSock.pas

  2. “I’ve found it boils down to just two options in practice:”

    I see a third. If the threads are isolated, like for instance session in a data or web server, you can just catch the exception and only terminate / finish the corrupted thread. No need to terminate the app. But it all depends on isolation level.

    I am also curious what benefit does this approach for a thread pool have over more wide spreed task pool like the one in OmniThread library or the one I use in my code. There a free thread executes a task given to it and that is basically one cycle in a execute procedure inside the thread.

  3. You do NOT need a file on disk in order to create an IOCP queue. You can pass INVALID_HANDLE_VALUE, or any object that supports IOCP (many objects do, not just files). The documentation you linked to says as much.

  4. @A. Bouchez: you can easily adjust the number of threads dynamically. If you maintain an active threads counter, you can use it to periodically bump up or down the number of threads depending on load. To remove a thread from the pool, the simplest approach I’ve found is to support another “command”, f.i. if lpNumberOfBytesTransferred is 2 the worker thread terminates and removes itself from the pool.

    @Remy: thanks I had overlooked that part, makes it even simpler! I’ll amend the article.

  5. @Iztok: when an unexpected exception occurs, it could be anything, including something that’ll break your thread isolation (threads share the same memory space, so a bug can result in threads messing up each other, even though they shouldn’t).

    The main advantage of IOCP against user-mode code is performance, it involves very few API calls, and is implemented kernel-side. An IOCP queue can easily handle hundreds of thousands of work units per second on my old quad core f.i., and you can queue millions of work units without problems.

  6. @chapa This is something I haven’t had time to investigate yet (and didn’t have appropriate OS for dev), but yes, I anticipate to have them in, just need to migrate my main dev rig to Win8 or 2012 first.

  7. Please, let me know once you are into this. Will rebuild dwsockets in order to work with latest DWS Web Server release, and be used as a base, if you find the code useful.
    Would be glad if I can be in any other development help in this matter.

  8. Polling for completion using sleep in TIOCPWorkerThreadPool.Shutdown method is not nice.
    I would suggest using TCountdownEvent, waiting the object to be signalled in shutdown method 😉

  9. When you create IOCP you use CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
    The last param is NumberOfConcurrentThreads and it is zero.
    But MSDN says: If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.

    So my question is: if zero means max concurrent threads equals the count of the processors what is the meaning of WorkerCount in dwsIOCPWorkerThreadPool? If I create 100 threads, but have 4 CPUs only 4 threads will run in fact?

  10. @Petar Good catch about the last parameter, fixed it.

    I don’t think the signal would be nicer, as that would add the Event overhead for every work unit, along with special code to reset the signal. Calls to Shutdown will typically happen just once when the app terminates, and Shutdown only makes sense for non-GUI apps (GUI apps will want to monitor the shutdown progress to give feedback and generally avoid being non-responding)

Comments are closed.