Singular Update Queue

Share Our Content With ⤦

Share on facebook
Share on twitter
Share on linkedin
Share on google
Share on whatsapp
Share on telegram
Share on reddit
Share on pinterest
Share on print


Implement a workqueue and a single thread working off the queue. Multiple
concurrent clients can submit state changes to the queue. But a single
thread works on state changes. This can be naturally implemented with
goroutines and channels in languages like golang.

Figure 1: Single Thread Backed By A Work Queue

A typical Java implementation looks like following:

Figure 2: SingularUpdateQueue in Java

A SingularUpdateQueue has a queue and a function to be applied for work items in the queue.
It extends from java.lang.Thread, to make sure that it has its own single thread of execution.

public class SingularUpdateQueue<Req, Res> extends Thread implements Logging {
    private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue
            = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100);
    private Function<Req, Res> handler;
    private volatile boolean isRunning = false;

Clients submit requests to the queue on their own threads. The queue
wraps each request in a simple wrapper to combine it with a future,
returning the future to the client so that the client can react once the
request is eventually completed.

class SingularUpdateQueue…

  public CompletableFuture<Res> submit(Req request) {
      try {
          var requestWrapper = new RequestWrapper<Req, Res>(request);
          return requestWrapper.getFuture();
      catch (InterruptedException e) {
          throw new RuntimeException(e);
class RequestWrapper<Req, Res> {
    private final CompletableFuture<Res> future;
    private final Req request;

    public RequestWrapper(Req request) {
        this.request = request;
        this.future = new CompletableFuture<Res>();
    public CompletableFuture<Res> getFuture() { return future; }
    public Req getRequest()                   { return request; }

The elements in the queue are processed by the single dedicated thread
that SingularUpdateQueue inherits from Thread. The queue allows multiple concurrent producers to add the tasks for execution.
The queue implementation should be thread safe, and should not add a lot of overhead under contention.
The execution thread picks up requests from the queue and process them one at a time.
The CompletableFuture is completed with the response of the task

class SingularUpdateQueue…

  public void run() {
       isRunning = true;
       while(isRunning) {
           Optional<RequestWrapper<Req, Res>> item = take();
           item.ifPresent(requestWrapper -> {
               try {
                   Res response = handler.apply(requestWrapper.getRequest());

               } catch (Exception e) {

class RequestWrapper…

  public void complete(Res response) {

  public void completeExceptionally(Exception e) {

It’s useful to note that we can put a timeout while reading items from the queue, instead of blocking it indefinitely. It allows us to exit the thread if needed,
with isRunning set to false, and the queue will not block indefinitely if it’s empty, blocking the execution thread.
So we use the poll method with a timeout, instead of the take method, which blocks indefinitely. This gives us the ability to shutdown the thread of execution cleanly.

class SingularUpdateQueue…

  private Optional<RequestWrapper<Req, Res>> take() {
      try {
          return Optional.ofNullable(workQueue.poll(300, TimeUnit.MILLISECONDS));

      } catch (InterruptedException e) {
          return Optional.empty();

  public void shutdown() {
      this.isRunning = false;

For example, a server processing requests from multiple clients and updating write ahead log,
can have use a SingularUpdateQueue as following.

Figure 3: A SingularUpdateQueue to update write ahead log

A client of the SingularUpdateQueue would set it up by specifying its
paramaterized types and the function to run when processing the message
from the queue. For this example, we’re using a consumer of requests for a
write ahead log. There is a single instance of this consumer, which will
control access to the log data structure. The consumer needs to put each
request into a log and then return a response. The response message can
only be sent after the message has been put into the log. We use a
SingularUpdateQueue to ensure there’s a reliable ordering for these

public class WalRequestConsumer implements Consumer<Message<RequestOrResponse>> {

    private final SingularUpdateQueue<Message<RequestOrResponse>, Message<RequestOrResponse>> walWriterQueue;
    private final WriteAheadLog wal;

    public WalRequestConsumer(Config config) {
        this.wal = WriteAheadLog.openWAL(config);
        walWriterQueue = new SingularUpdateQueue<>((message) -> {
            return responseMessage(message);

    private void startHandling() { this.walWriterQueue.start(); }

The consumer’s accept method takes messages, puts them on the queue
and after each message is processed, sends a response. This method is run
on the caller’s thread, allowing many callers to invoke
accept at the same time.

public void accept(Message message) {
    CompletableFuture<Message<RequestOrResponse>> future = walWriterQueue.submit(message);
    future.whenComplete((responseMessage, error) -> {

Choice of the queue

The choice of the queue data structure is an important one to be made. On JVM, there are various data structures available to chose from:

  • ArrayBlockingQueue (Used in Kafka request queue)
  • As the name suggests, this is an array-backed blocking queue.
    This is used when a fixed bounded queue needs to be created. Once the queue fills up, the producer will block. This provides blocking backpressure
    and is helpful when we have slow consumers and fast producers

  • ConcurrentLinkedQueue along with ForkJoinPool (Used in Akka Actors mailbox implementation)
  • ConcurrentLinkedQueue can be used when we do not have consumers waiting for the producer, but there is some coordinator which schedules consumers
    only after tasks are queued onto the ConcurrentLinkedQueue.

  • LinkedBlockingDeque (Used By Zookeeper and Kafka response queue)
  • This is mostly used when unbounded queuing needs to be done, without blocking the producer. We need to be careful with this choice, as the queue might fill up quickly if no backpressure techniques are implemented
    and can go on consuming all the memory

  • RingBuffer (Used in LMAX Disruptor, )
  • As discussed in LMAX Disruptor, sometimes, task processing is latency sensitive.
    So much so, that copying tasks between processing stages with ArrayBlockingQueue can add to latencies which are not acceptable.
    RingBuffer can be used in these cases to pass tasks between stages.

Using Channels and Lightweight Threads.

This can be a natural fit for languages or libraries which support lightweight threads along with the concept of channels (e.g. golang, kotlin).
All the requests are passed to a single channel to be processed.
There is a single goroutine which processes all the messages to update state.
The response is then written to a separate channel, and processed by separate goroutine to send it back to clients.
As seen in the following code, the requests to update key value are passed onto a single shared request channel.

func (s *server) putKv(w http.ResponseWriter, r *http.Request)  {
  kv, err := s.readRequest(r, w)
  if err != nil {

  request := &requestResponse{
    request:         kv,
    responseChannel: make(chan string),

  s.requestChannel <- request
  response := s.waitForResponse(request)

The requests are processed in a single goroutine to update all the state.

func (s* server) Start() error {
  go s.serveHttp()

  go s.singularUpdateQueue()

  return nil

func (s *server) singularUpdateQueue() {
  for {
    select {
    case e := <-s.requestChannel:
      e.responseChannel <- buildResponse(e);


Backpressure can be an important concern when a work queue is used to communicate between threads.
In case the consumer is slow and the producer is fast, the queue might fill up fast.
Unless some precautions are taken, it might run out of memory with a large number of tasks filling up the queue.
Generally, the queue is kept bounded with sender blocking if the queue is full.
For example, java.util.concurrent.ArrayBlockingQueue has two methods to add elements. put method blocks the producer if the array is full.
add method throws IllegalStateException if queue is full, but doesn’t block the producer.
It’s important to know the semantics of the methods available for adding tasks to the queue.
In the case of ArrayBlockingQueue, put method should be used to block the sender and provide backpressure by blocking.
Frameworks like reactive-streams can help implement a more sophisticated backpressure mechanism from consumer to the producer.

Other Considerations

  • Task Chaining.
  • Most of the time the processing needs to be done with chaining multiple tasks together.
    The results of a SingularUpdateQueue execution need to be passed to other stages.
    e.g. As shown in the WalRequestConsumer above, after the records are written to the write ahead log,
    the response needs to be sent over the socket connection. This can be done by executing the future returned by SingularUpdateQueue
    on a separate thread. It can submit the task to other SingularUpdateQueue as well.

  • Making External Service Calls.
  • Sometimes, as part of the task execution in the SingularUpdateQueue,
    external service calls need to be made and the state of the SingularUpdateQueue is updated by the response of the service call.
    It’s important in this scenario that no blocking network calls are made or it blocks the only thread which is processing all the tasks.
    The calls are made asynchronously. Care must be taken to not access the SingularUpdateQueue state in the future callback of the asynchronous service call
    because this can happen in a separate thread, defeating the purpose of doing all state changes in SingularUpdateQueue by single thread.
    The result of the call should be added to the work queue similar to other events or requests.

Source link

Share on facebook
Share on twitter
Share on linkedin
Share on pinterest
Share on reddit
Share on telegram
Share on email

Leave a Reply

Your email address will not be published. Required fields are marked *