In this tutorial, we create a fully functional event-driven workflow using kombuTreating messaging as a core architectural capability. We walk step by step through the setup of exchanges, routing keys, background workers, and concurrent producers, allowing us to observe a real distributed system. As we implement each component, we see how clean message flow, asynchronous processing, and routing patterns give us the same power that production microservices rely on every day. check it out full code,
!pip install kombu
import threading
import time
import logging
import uuid
import datetime
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=(logging.StreamHandler(sys.stdout)),
force=True
)
logger = logging.getLogger(__name__)
BROKER_URL = "memory://localhost/"
We start by installing Kombu, importing the dependencies, and configuring logging so we can clearly see every message flowing through the system. We also set up an in-memory broker URL, allowing us to run everything locally in Colab without the need for RabbitMQ. This setup forms the foundation of our distributed messaging workflow. check it out full code,
media_exchange = Exchange('media_exchange', type="topic", durable=True)
task_queues = (
Queue('video_queue', media_exchange, routing_key='video.#'),
Queue('audit_queue', media_exchange, routing_key='#'),
)
We define a topic exchange to flexibly route messages using the wildcard pattern. We also create two queues: one dedicated to video-related tasks and another audit queue that listens to everything. Using topic routing, we can precisely control how messages flow through the system. check it out full code,
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False
def get_consumers(self, Consumer, channel):
return (
Consumer(queues=self.queues,
callbacks=(self.on_message),
accept=('json'),
prefetch_count=1)
)
def on_message(self, body, message):
routing_key = message.delivery_info('routing_key')
payload_id = body.get('id', 'unknown')
logger.info(f"n⚡ RECEIVED MSG via key: ({routing_key})")
logger.info(f" Payload ID: {payload_id}")
try:
if 'video' in routing_key:
self.process_video(body)
elif 'audit' in routing_key:
logger.info(" 🔍 (Audit) Logging event...")
message.ack()
logger.info(f" ✅ ACKNOWLEDGED")
except Exception as e:
logger.error(f" ❌ ERROR: {e}")
def process_video(self, body):
logger.info(" ⚙️ (Processor) Transcoding video (Simulating work...)")
time.sleep(0.5)
We implement a custom worker using Kombu’s ConsumerMixin to run it in a background thread. In the message callback, we inspect the routing key, invoke the appropriate processing function, and acknowledge the message. This worker architecture provides us with clean, concurrent message consumption with full control. check it out full code,
def publish_messages(connection):
producer = Producer(connection)
tasks = (
('video.upload', {'file': 'movie.mp4'}),
('user.login', {'user': 'admin'}),
)
logger.info("n🚀 PRODUCER: Starting to publish messages...")
for r_key, data in tasks:
data('id') = str(uuid.uuid4())(:8)
logger.info(f"📤 SENDING: {r_key} -> {data}")
producer.publish(
data,
exchange=media_exchange,
routing_key=r_key,
serializer="json"
)
time.sleep(1.5)
logger.info("🏁 PRODUCER: Done.")
Now we create a producer that sends a structured JSON payload to the exchange with different routing keys. We create unique IDs for each event and see how they are sent to other queues. This mirrors real-world microservice event publishing, where producers and consumers remain separate. check it out full code,
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()
logger.info("✅ SYSTEM: Worker thread started.")
time.sleep(1)
try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info("n👋 SYSTEM: Execution complete.")
if __name__ == "__main__":
run_example()
We start the worker in the background thread and fire the producer in the main thread. This architecture gives us a mini distributed system running in Colab. Observing the logs, we see messages published → routed → consumed → acknowledged, completing the entire event-processing lifecycle.
Finally, we have orchestrated a dynamic, distributed task-routing pipeline that processes real-time events with clarity and accuracy. We saw how Kombu removes the complexity of messaging systems while giving us fine-grained control over routing, consumption, and worker concurrency. As we watched messages move from producer to exchange to queue to worker, we gained a deeper appreciation for the beauty of event-driven system design, and we are now well-equipped to scale this foundation into robust microservices, background processors, and enterprise-grade workflows.
check it out full code, Feel free to check us out GitHub page for tutorials, code, and notebooksAlso, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter,
Asif Razzaq Marktechpost Media Inc. Is the CEO of. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. Their most recent endeavor is the launch of MarketTechPost, an Artificial Intelligence media platform, known for its in-depth coverage of Machine Learning and Deep Learning news that is technically robust and easily understood by a wide audience. The platform boasts of over 2 million monthly views, which shows its popularity among the audience.