Skip to main content

Message Queue Implementation Examples

Kafka

Producer Example

// Producer configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Create producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Send message
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
"key",
"message"
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});

// Close producer
producer.close();

Consumer Example

// Consumer configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to topic
consumer.subscribe(Arrays.asList("my-topic"));

// Poll for messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received: key=%s, value=%s%n",
record.key(), record.value());
}
}

RabbitMQ

Producer Example

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='my_exchange', exchange_type='direct')
channel.queue_declare(queue='my_queue')
channel.queue_bind(exchange='my_exchange', queue='my_queue', routing_key='my_key')

# Send message
channel.basic_publish(
exchange='my_exchange',
routing_key='my_key',
body='Hello World!'
)

# Close connection
connection.close()

Consumer Example

import pika

# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare queue
channel.queue_declare(queue='my_queue')

# Define callback
def callback(ch, method, properties, body):
print(f"Received: {body}")

# Start consuming
channel.basic_consume(
queue='my_queue',
on_message_callback=callback,
auto_ack=True
)

channel.start_consuming()

AWS SQS

Producer Example

import boto3

# Create SQS client
sqs = boto3.client('sqs')

# Get queue URL
queue_url = sqs.get_queue_url(QueueName='my-queue')['QueueUrl']

# Send message
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody='Hello World!',
MessageAttributes={
'Author': {
'StringValue': 'John Doe',
'DataType': 'String'
}
}
)

print(f"Message ID: {response['MessageId']}")

Consumer Example

import boto3

# Create SQS client
sqs = boto3.client('sqs')

# Get queue URL
queue_url = sqs.get_queue_url(QueueName='my-queue')['QueueUrl']

# Receive messages
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)

if 'Messages' in response:
for message in response['Messages']:
print(f"Received: {message['Body']}")

# Delete message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)

Redis Streams

Producer Example

import redis

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# Add message to stream
message_id = r.xadd('mystream', {
'field1': 'value1',
'field2': 'value2'
})

print(f"Message ID: {message_id}")

Consumer Example

import redis

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# Consumer group
group_name = 'mygroup'
consumer_name = 'consumer1'

# Create consumer group
try:
r.xgroup_create('mystream', group_name, '0')
except redis.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise

# Read messages
while True:
messages = r.xreadgroup(
group_name,
consumer_name,
{'mystream': '>'},
count=1,
block=0
)

for stream, message_list in messages:
for message_id, fields in message_list:
print(f"Received: {fields}")

# Acknowledge message
r.xack('mystream', group_name, message_id)