RabbitMQ Work Queues Using python

SENG 41283 — Distributed and Cloud Computing

Nipun Thennakoon
5 min readJul 22, 2020

What is RabbitMQ?

RabbitMQ is an open-source message-broker software that supports protocols like Advanced Message Queuing Protocol (AMQP), Streaming Text Oriented Messaging Protocol, and MQ Telemetry Transport. It’s written in Erlang and built on top of the Open Telecom Platform framework for clustering and failover. It’s got client libraries in all major programming languages to interface with the broker.

What is a work queue?

A work queue is a way to avoid executing resource-intensive tasks immediately and having to wait for them to complete. Instead, the tasks can be encapsulated as a message and placed on a queue for later execution. A worker process/processes running on the background can eventually pop the tasks from the queue and execute them.

This concept is especially useful in contexts like web applications where it may be difficult to handle complex jobs within a short HTTP request window.

Some terminology

  • Producer: A program that sends messages is the producer. In this case, the producer places the tasks that are needed to be done in the work queue.
  • Queue: Essentially a large message buffer bounded only by host’s memory disk limits.
  • Consumer: A program that receives messages is the consumer. In this case, the consumer pops the messages from the work queue and executes them.

So, now that we know of RabbitMQ and work queues, let’s get to an example.

We’ll use python for this since it has an easy to understand syntax and ability to do more with fewer lines of code.

Setting up the environment

First, you need to install RabbitMQ server on your computer. Please refer to the installation guide and follow the necessary steps for your platform to do so.

Even though it’s not strictly necessary, I urge you to set up a virtual environment for this project because it is a good practice to do so. There are many tools for doing this and Virtualenv is the one I prefer. If you use an IDE like pycharm, a virtual environment is created for you by default.

Then install Pika, the RabbitMQ client library for python using

pip install pika

in your virtual environment.

producer.py

This is the producer that publish tasks for our work queue. Since this is an example, we simulate the complex tasks by sending strings. We can denote the complexity of each message by the number of dots in the string. This is the sample code.

import pika
import sys

connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)

print(" [x] Sent %r" % message)
connection.close()

let’s understand the code a little bit.

First, we need to establish a connection between our producer and RabbitMQ server and obtain a channel. Since our message broker is on the local machine, we have used “localhost” as our host parameter. Otherwise, you have to use the IP address of your host. (Connection is a real TCP connection to the message broker while a channel is a virtual connection inside it)

Next, we need to check whether the recipient queue exists and create if it doesn’t. queue_declare() method does this. It is an idempotent operation, so we can call this method as many times as we want, but only one will be created. Here, by setting durable=True we can ensure that the queue won’t be lost even if the server crashed or rebooted.

The next line is just reading our input strings from the command line and substitutes a default message if there were no command-line inputs available.

Then, we just publish the message to our work queue through the default exchange which is denoted by an empty string. routing_key is the name of our queue, body is the message we are publishing to the queue. We can set delivery_mode = 2 to make messages persistent.

Finally, we need to make sure the network buffers are flushed and the messages are actually delivered to RabbitMQ. For this, we gently close the connection.

consumer.py

This is the consumer that pops tasks from our work queue and executes them. This script takes our message strings from the work queue and simulates(or fakes 😁) its complexity by putting current thread to sleep for the number of seconds denoted by dots in the message. For example, if the message is “Hello….”, the program will sleep for 04 seconds.

import pika
import time

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

As in the producer, we have to create a connection to the RabbitMQ server, get a channel, and check if the queue exists.

Then we have to define a callback function which we’ll subscribe to our work queue. Whenever we receive a message, this callback function will be called by the Pika library. Here we have implemented how our task should be executed.

ch.basic_ack() sends an acknowledgment to the RabbitMQ server after the task was completed. If a consumer dies without sending an ack, the server will understand that the task is not executed completely and re-queue it for execution with another consumer. However, there are no message timeouts. RabbitMQ will only redeliver the message if the consumer dies. It won’t re-queue the message even if the consumer takes a long long time to execute it.

We use basic_qos() channel method with prefetch_count=1 setting to achieve fair dispatch. For example, in a situation where there are two workers (consumers), if the odd messages are heavy and the even messages are light, one worker will be hard at work while the other is hardly doing any work. If we want to avoid this behavior, we can use basic_qos() method which use basic.qos protocol methods to tell RabbitMQ not to dispatch a new message to a worker until it completes and acknowledge the previous message. Instead, RabbitMQ will dispatch the message to the next worker that is not still busy.

basic_consume() channel method tells RabbitMQ to call our callback function when we receive a message from our work queue. Then with start_consuming() method, the worker enters to a infinite loop where it waits for messages and run callbacks whenever necessary.

Running things….

We can have multiple producers and consumers at the same time. This allows you to easily parallelize the work. Let’s use one producer and two consumers for trying out our example. We’ll need three shells. One for the producer and other two for consumers.

Let’s start our consumers in first and second shells.

# shell 1
>> python consumer.py
# => [*] Waiting for messages. To exit press CTRL+C
--------------------------------------------------------------------
# shell 2
>> python consumer.py
# => [*] Waiting for messages. To exit press CTRL+C

And publish tasks using the producer.py in the third shell.

# shell 3
>> python producer.py first task..
# => [x] Sent 'first task..'
>> python producer.py second task...
# => [x] Sent 'second task...'
>> python producer.py third task....
# => [x] Sent 'third task....'
>> python producer.py fourth task...
# => [x] Sent 'fourth task...'

We can see that our tasks have been delivered to our two consumers.

# Shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received b'first task..'
# => [x] Done
# => [x] Received b'third task....'
# => [x] Done
--------------------------------------------------------------------
# Shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received b'second task...'
# => [x] Done
# => [x] Received b'fourth task...'
# => [x] Done

Aaaand that’s a wrap folks! Thank you for sticking with me so far. I’ll see you in the next one!

--

--