RabbitMQ Work Queues Using python

SENG 41283 — Distributed and Cloud Computing

What is RabbitMQ?

What is a work queue?

Some terminology

Setting up the environment

pip install pika

producer.py

import pika
import sys

connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)

print(" [x] Sent %r" % message)
connection.close()

consumer.py

import pika
import time

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

Running things….

# shell 1
>> python consumer.py
# => [*] Waiting for messages. To exit press CTRL+C
--------------------------------------------------------------------
# shell 2
>> python consumer.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 3
>> python producer.py first task..
# => [x] Sent 'first task..'
>> python producer.py second task...
# => [x] Sent 'second task...'
>> python producer.py third task....
# => [x] Sent 'third task....'
>> python producer.py fourth task...
# => [x] Sent 'fourth task...'
# Shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received b'first task..'
# => [x] Done
# => [x] Received b'third task....'
# => [x] Done
--------------------------------------------------------------------
# Shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received b'second task...'
# => [x] Done
# => [x] Received b'fourth task...'
# => [x] Done