Skip to main content

Communicate Through RabbitMQ Using NodeJS and Fastify

If we have two or more services that need to talk to each other but it is allowed to be asynchronous, we can implement a queue system in our system using RabbitMQ. RabbitMQ server will maintain all queues and connections to all services connected to it.

This post will utilize Fastify as a NodeJS framework to build our program. This framework is similar to Express but implements some unique features like a plugin concept and an improved request-respond handler.

First, we need to create two plugins, one is for sending a message, another one is for consuming the sent message. At first, we will make it using a normal queue. There is one mechanism for how a queue works, it is like a queue in the real world. When there are five persons in a queue and three staff to handle the queue, one person is served only by one staff, there is no need for other staff to handle any person that has been served, and there is no need for a person to be handled repeatedly by other staffs. For example, if there are two services that listen to a queue and a message is passed to the queue, RabbitMQ will manage which connected service will handle the message.


Create a plugin to send a message

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Publish a message to the queue

For instance, the queue will be named all_queue.

import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import amqp, { Channel } from 'amqplib';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();
  
  // insert the queue only if it doesn't exist
  await channel.assertQueue(queueName, {});

  // create a function to publish a message
  function publishMessage(messageObject){
    const msg = JSON.stringify(messageObject);
    channel.publish('', queueName, Buffer.from(msg));
  }

  // make the function available in fastify scope
  fastify.decorate('publishMessage', publishMessage);

  done();
}

export default fastifyPlugin(rabbitPlugin);

In the code above, we declare a function to publish a message named publishMessage and decorate the fastify instance so that the function can be called in any part of our application. We also utilize fastify-plugin so that our plugin can be available in any plugin regardless of its location.


Create a plugin to consume the sent messages

The plugin will run the following processes.

  1. Create a channel to the RabbitMQ server
  2. Assert a queue
  3. Listen and consume any messages on the queue
import amqp, { Channel, ConsumeMessage } from 'amqplib';
import { FastifyInstance, FastifyPluginOptions } from 'fastify';
import fastifyPlugin from 'fastify-plugin';

const rabbitUrl = 'amqp://guest:guest@localhost:5672'; // change to your own server URL
const queueName = 'all_queue'

async function getRabbitChannel(): Promise<Channel> {
  try {
    const conn = await amqp.connect(rabbitUrl);
    return conn.createChannel();
  } catch (error: any) {
    console.error(error.message || error)
    throw error; 
  }
}

async function consumeRabbitMessage(msg: ConsumeMessage | null, fastify: FastifyInstance, channel: Channel) {
  if (!msg) {
    return;
  }

  try {
    const data = JSON.parse(msg.content.toString());
    
    // do something else
    
  } catch (error) {
    fastify.log.error(error);
  }
}

async function rabbitPlugin(fastify: FastifyInstance, opts: FastifyPluginOptions, done: any) {
  const channel = await getRabbitChannel();

  await channel.assertQueue(queueName, {}).then(() => {
    // listen to the queue
    return channel.consume(queueName, async (msg) => {
      await consumeRabbitMessage(msg, fastify, channel);
    });
  });

  done();
}

export default fastifyPlugin(rabbitPlugin);

Notice that we pass fastify and channel instances to the message handler function (consumeRabbitMessage). It is because the handler function is outside the plugin declaration scope and we want to allow the function to use those instances.


Registering the plugin

Finally, we can register each plugin to our services, one is for our publisher service, and another is for our consumer service.

import fastify from 'fastify';
import rabbitPlugin from 'your/plugin/location'; // publisher or consumer

const server = fastify({ logger: true });
server.register(rabbitPlugin);

// some codes


Comments

Popular posts from this blog

Configuring Swap Memory on Ubuntu Using Ansible

If we maintain a Linux machine with a low memory capacity while we are required to run an application with high memory consumption, enabling swap memory is an option. Ansible can be utilized as a helper tool to automate the creation of swap memory. A swap file can be allocated in the available storage of the machine. The swap file then can be assigned as a swap memory. Firstly, we should prepare the inventory file. The following snippet is an example, you must provide your own configuration. [server] 192.168.1.2 [server:vars] ansible_user=root ansible_ssh_private_key_file=~/.ssh/id_rsa Secondly, we need to prepare the task file that contains not only the tasks but also some variables and connection information. For instance, we set /swapfile  as the name of our swap file. We also set the swap memory size to 2GB and the swappiness level to 60. - hosts: server become: true vars: swap_vars: size: 2G swappiness: 60 For simplicity, we only check the...

Rangkaian Sensor Infrared dengan Photo Dioda

Keunggulan photodioda dibandingkan LDR adalah photodioda lebih tidak rentan terhadap noise karena hanya menerima sinar infrared, sedangkan LDR menerima seluruh cahaya yang ada termasuk infrared. Rangkaian yang akan kita gunakan adalah seperti gambar di bawah ini. Pada saat intensitas Infrared yang diterima Photodiode besar maka tahanan Photodiode menjadi kecil, sedangkan jika intensitas Infrared yang diterima Photodiode kecil maka tahanan yang dimiliki photodiode besar. Jika  tahanan photodiode kecil  maka tegangan  V- akan kecil . Misal tahanan photodiode mengecil menjadi 10kOhm. Maka dengan teorema pembagi tegangan: V- = Rrx/(Rrx + R2) x Vcc V- = 10 / (10+10) x Vcc V- = (1/2) x 5 Volt V- = 2.5 Volt Sedangkan jika  tahanan photodiode besar  maka tegangan  V- akan besar  (mendekati nilai Vcc). Misal tahanan photodiode menjadi 150kOhm. Maka dengan teorema pembagi tegangan: V- = Rrx/(Rrx + R2) x Vcc V- = 150 / (1...

Installing VSCode Server Manually on Ubuntu

I've ever gotten stuck on updating the VSCode server on my remote server because of an unstable connection between my remote server and visualstudio.com that host the updated server source codes. The download and update process failed over and over so I couldn't remotely access my remote files through VSCode. The solution is by downloading the server source codes through a host with a stable connection which in my case I downloaded from a cloud VPS server. Then I transfer the downloaded source codes as a compressed file to my remote server through SCP. Once the file had been on my remote sever, I extracted them and align the configuration. The more detailed steps are as follows. First, we should get the commit ID of our current VSCode application by clicking on the About option on the Help menu. The commit ID is a hexadecimal number like  92da9481c0904c6adfe372c12da3b7748d74bdcb . Then we can download the compressed server source codes as a single file from the host. ...

API Gateway Using KrakenD

The increasing demands of users for high-quality web services create the need to integrate various technologies into our application. This will cause the code base to grow larger, making maintenance more difficult over time. A microservices approach offers a solution, where the application is built by combining multiple smaller services, each with a distinct function. For example, one service handles authentication, another manages business functions, another maintains file uploads, and so on. These services communicate and integrate through a common channel. On the client side, users don't need to understand how the application is built or how it functions internally. They simply send a request to a single endpoint, and processes like authentication, caching, or database querying happen seamlessly. This is where an API gateway is effective. It handles user requests and directs them to the appropriate handler. There are several tools available for building an API gateway, su...

Deploying a Web Server on UpCloud using Terraform Modules

In my earlier post , I shared an example of deploying UpCloud infrastructure using Terraform from scratch. In this post, I want to share how to deploy the infrastructure using available Terraform modules to speed up the set-up process, especially for common use cases like preparing a web server. For instance, our need is to deploy a website with some conditions as follows. The website can be accessed through HTTPS. If the request is HTTP, it will be redirected to HTTPS. There are 2 domains, web1.yourdomain.com and web2.yourdomain.com . But, users should be redirected to "web2" if they are visiting "web1". There are 4 main modules that we need to set up the environment. Private network. It allows the load balancer to connect with the server and pass the traffic. Server. It is used to host the website. Load balancer. It includes backend and frontend configuration. Dynamic certificate. It is requ...

Manage Kubernetes Cluster using Rancher

Recently, I sought a simpler method to deploy and maintain Kubernetes clusters across various cloud providers. The goal was to use it for development purposes with the ability to manage the infrastructure and costs effortlessly. After exploring several options, I decided to experiment with Rancher. Rancher offers a comprehensive software stack for teams implementing container technology. It tackles both the operational and security hurdles associated with managing numerous Kubernetes clusters. Additionally, it equips DevOps teams with integrated tools essential for managing containerized workloads. Rancher also offers an open-source version, allowing free deployment within one's infrastructure. The Rancher platform can be deployed either as a Docker container or within a Kubernetes cluster utilizing the K3s engine. We can read the documentation on how to install Rancher on K3s using Helm . Rancher itself enables the creation and provisioning of Kubernetes clusters and ...