Thoughts and learnings in my journey through all things technology. Developer operations, software development, server administration and anything else that I decide should be written.

Tags


node.js cluster: Not just for HTTP

16th December 2016

Overview

The node.js cluster api is not only for scaling HTTP; even if that is how you might understand it from the documentation. The cluster module is a straight forward wrapper over child_process to make interfacing with child processes far easier. This post will refer to child processes as workers.

The cluster module is flexible and can provide several additional options to scaling out your software and handling more advanced techniques than simply load balancing. You can handle IPC (Interprocess Communication) and hand off specifics leveraging the multiple cores of your processor and providing more of a threading architecture.

Preface

In one of my latest projects, I was looking for an elegant solution for scaling out a queue consumer. This queue consumer is different than your average consumer as it needs to maintain on state. Part of my requirements were that it was not to utilize a key-value server such as redis or memcached and that it must utilize pure javascript so the initial solutions were out.

The use case I had was that data from an IoT device comes in through a host device that contains readings from children devices communicating over a radio. To make a long story short, the data must be aggregated by the child device, however, it needs to lookup host device information at varying times.

While this was the technique that I had utilized, it is not the only one that you might leverage for cluster. Many use cases can apply such as: load balancing HTTP (the common use case) and in conjunction creating worker processes that want parent communication. Hell you could even create a pipeline in this model but that might be better reserved for a queue where you have guarantees, message durability and many more abilities (I'll be posting on message queues in the near future).

Introduction

node.js cluster module allows us to spin off workers (child processes) to scale a workload. Since you are inside of a cluster, the master can communicate to all children and the children can communicate with the parent otherwise known as IPC (interprocess communication).

This is technically nothing different than what child_process can do for you, however, some simple baked in extras makes this extraordinarily more simple to manage.

In the most simple case communicating with the master process is nothing more than a simple process.send() which will asynchronously send the message to the master process and can be consumed by worker.on('message', message); just as in an EventEmitter can handle various events.

In the following examples I will be leveraging the STOMP protocol with a few wrappers and additional libraries. These are not that important so you can ignore a few technicalities. What is important to note is how the cluster module works and how it communicates between the master and children.

Building the Master

The cluster module shows it's examples utilizing a single JavaScript file. In this case for simplicity I will be doing the same (although I generally advocate that you separate the processing out by abstraction to allow for better maintenance and testability).

const Stomp = require('./lib/Stomp');  
const cluster = require('cluster');  
const stompConfig = require('./config/stomp');  
const jumphash = require('@ceejbot/jumphash');  
const yargs = require('yargs');

const argv = yargs  
  .usage('Usage: $0 -b [buckets]')
  .demand(['b'])
  .argv;

// cluster module will round-robin any incoming network connections
// that are opened (as they are shared by default), in this particular
// case I don't have that issue but let's be safe and set it to none.
cluster.schedulingPolicy = cluster.SCHED_NONE;

if (cluster.isMaster) {  
  // ...
}

So first off, you'll notice that the cluster module let's us know if we are in a child process or a master process by simply exposing a property called isMaster. This is helpful as now we do not need to maintain this state ourselves.

In this case, you'll see that I am going to be leveraging a jumphash module as well as yargs for processing command line arguments to help to determine how many children I am using and to consistently hash information to specific child processes. Consistent hashing is a larger topic but what it provides me is the ability to hash something and spread it out on a consistent basis. This can provide hot spots if you are not careful about what you're hashing on but if you always need the same key to go to the same worker this will aide you.

Let's see how we can spread the work between the workers below in a consistent fashion so that each child process leverages the same nodeId.

// ... continued from above
if (cluster.isMaster) {  
  const buckets = argv.b;
  const workerIndex = [];

  for (let i = 0; i < buckets; i++) {
    const worker = cluster.fork();
    workerIndex.push(worker.id);
  }

  const stomp = new Stomp(stompConfig);
  stomp.connect().then(client => {
    client.subscribe({
      destination: '/queue/MyQueue'
    }, (err, message) => {
      message.readString('utf-8').then(body => {
        body = JSON.parse(body);

        const buf = Buffer.allocUnsafe(8);
        buf.writeDoubleLE(body.nodeId, 0);

        cluster.workers[
          workerIndex[jumphash(buf, buckets)]
        ].send(body);
      });
    });
  });

  return;
}

There is a lot going on here but that's mainly due to the implementation in this case. To start, let's go over some of the implementation to make this a bit easier to understand.

First, I am taking a parameter (-b) through yargs that will provide me the number of workers I am going to create (child processes to fork). From there, I am using a STOMP client to fetch a message and parse the JSON inside of that message. Inside of that I have a property called nodeId which we will utilize the jumphash module to consistently hash and provide back a number which corresponds to the worker index. As a side note here, since the worker id's are not sequential it necessitated creating an index after forking of the worker.

Now that we have our worker, we can send the message over. First thing to notice here is that it was unnecessary to run the data through JSON.serialize to send it to the worker. node.js handles this for you.

At this point, we can start to wire in our worker code and process the message that we had received.

Wiring the Worker

You'll notice that at the end of the if statement I return. This is on purpose as that leaves me the remainder of the file to implement the worker and IMO makes it easier to maintain.

Since we are sending a message to the worker, we just need to receive it. Since process is already in scope, all we need to do is start listening for the message via process.on().

// ... after the if statement from above

process.on('message', (message) => {  
  if ('object' !== typeof message) {
    return; // we cannot handle this message
  }

  // do something with message
});

As you can see, we can easily and efficiently get our message and start to process it. I'm leaving out quite a bit of detail here as it is not necessary but the same object that we sent from above will arrive here and since it is in a child process, we can leverage more of our CPU cores and not have to deal with asynchronous blocking issues.

Message Workflow

Now remember when I stated earlier that we want to be able to process the message and then look up additional details? Sometimes we have a property called a nid that is associated with a host and we need to persist that value as we sometimes have a property called relayNid that provides us a mapping from the host device to a relay node but it is provided on a node by node basis.

Since we may be sending several messages back and forth, we do need to identify them and as such we will add in a property called cmd so that we can index off of it to understand the operation we need to run. Lastly, in messaging we have a message ID but it can be anything to help you represent a transaction in this case (I'll show just a simple example of this, it can be much more elegant and complex but my goal is to provide you with a simple overview).

Master Handling

Now, let's get back to the Master process in the cluster. We need to add in some form of message handling that we are going to be getting back from the worker. In this case, are are leveraging worker.on('message', (worker, message, handle) => {}). We'll be ignoring the handle in this example.

Secondly, we are going to leverage that message that the worker had sent us and send something back to the worker. In this particular case I am assuming that we have a property called id that we can leverage between the systems on the message.

if (cluster.isMaster) {  
  // ... snip
  const hostMap = new Map();
  const messageHandler = function(worker, message) {
    switch (message.cmd) {
      case 'nid':
        if (!hostMap.has(message.host)) {
          hostMap.set(message.host, new Map());
        }
        const nodeMap = hostMap.get(message.host);
        nodeMap.set(message.nid, message.nodeId);
        break;
      case 'nidLookup':
        let ret = null;
        if (hostMap.has(message.host)) {
          const nodeMap = hostMap.get(message.host);
          if (nodeMap.has(message.nid)) {
            ret = nodeMap.get(message.nid);
          }
        }
        return worker.send({
          id: message.id,
          cmd: message.cmd,
          value: map.get(message.nid)
        })
        break;
      default:
        throw new Error('Unknown command: ' + message.cmd);
    }
  };

  for (let i = 0; i < buckets; i++) {
    const worker = cluster.fork();
    worker.on('message', messageHandler);
    workerIndex.push(worker.id);
  }

  const stomp = new Stomp(stompConfig);
  stomp.connect().then(client => {
    client.subscribe({
      destination: '/queue/MyQueue'
    }, (err, message) => {
      message.readString('utf-8').then(body => {
        body = JSON.parse(body);
        body.cmd = 'message';

        const buf = Buffer.allocUnsafe(8);
        buf.writeDoubleLE(body.nodeId, 0);

        cluster.workers[
          workerIndex[jumphash(buf, buckets)]
        ].send(body);
      });
    });
}

Child Handling

On to the child! First things first. We added a command in the master and we also sent a command back. We are going to start working on handling these specific cases. When we send up a command we are going to basically emulate having a transaction. In this particular case I am simply handling them by having a map. It might seem a bit confusing at first but remember that we're scoped in our worker in our own process and therefore these do not bleed over to each other.

process.on('message', (message) => {  
  if ('object' !== typeof message) {
    return; // we cannot handle this message
  }

  const transactions = new Map();

  switch (message.cmd) {
    case 'message':
      // store our representation, we do not care about a response
      if (message.nid) {
        process.send({
          host: message.host,
          nid: message.nid,
          nodeId: message.nodeId
        });
      }

      // we would like to have this value...
      if (message.relayNid) {
        transactions.set(message.id, message);
        return process.send({
          id: message.id,
          cmd: 'nidLookup',
          nid: message.relayNid,
          host: message.host
        });
      }
      break;
    case 'nidLookup':
      if (transactions.has(message.id)) {
        const existingMessage = transactions.get(message.id);
        existingMessage.relayNodeId = message.value;
        transactions.delete(message.id);
      }
      break;
  }

  // ... finish processing message
});

So here you can see how the message flows down to the worker. If the message contains a relayNid property it will attempt to look it up in the master and suspend additional processing (note that we return after we send the message to the parent). The expectation is now that we will get a response on the nidLookup command. This is why we have a Map transactions object to keep track of these. In this type of flow, we can handle pre-processing earlier in the message command, then break and handle other items underneath it and finally handle persisting the message if it makes it out of the case statement.

Final Thoughts

I hope this helps you understand the many solutions and variants that you can create with nodejs cluster instead of simply focusing on load balancing http. This posting provides just some basics as such you will need to think about various additional things that vary in complexity including:

  • Error Handling
  • Signals (KILL / SIGINT)
  • Exiting workers

I am a VP of engineering for a small start up. I have over a decade of experience in engineering, database administration, server administration and management.

View Comments