In this article we’ll see how to apply the CQRS (Command Query Responsibility Segregation) pattern in a small Python micro-project that uses Kafka as a message broker and Docker to run the services.
The goal is to build a small architecture with:
- a command service that receives write requests and publishes events to Kafka;
- a query service that reads events from Kafka and updates an optimized read model;
- a write model separated from the read model.
1. Quick refresher: what is CQRS
CQRS is an architectural pattern that explicitly separates the write side (Commands) from the read side (Queries). Instead of having a single service or model that handles both reads and writes, we have two:
- Command side: receives commands (e.g. create order, update status), validates business rules and generates events (e.g. OrderCreated, OrderStatusChanged).
- Query side: receives events, updates a database optimized for reads (for example denormalized tables, caches, materialized views) and exposes read-only APIs.
Kafka is perfect for connecting these two parts: the command side publishes events to a topic and the query side consumes them to build and maintain the read model.
2. Example architecture
Let’s build a simple order management system:
- orders-command-service (Python):
- exposes a small REST API to create orders;
- writes the order to a write database (for example PostgreSQL or even in-memory for the example);
- publishes an
OrderCreatedevent to Kafka.
- orders-query-service (Python):
- consumes
OrderCreatedevents from Kafka; - updates a read database (for the example: a simple in-memory store or SQLite);
- exposes a REST API to read orders.
- consumes
- Kafka and ZooKeeper (or KRaft, depending on the image used) in Docker.
3. Docker Compose for Kafka and the services
To orchestrate everything we’ll use docker-compose.yml. The following file defines a minimal Kafka cluster
and two Python services:
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
orders-command-service:
build: ./orders-command-service
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8000:8000"
orders-query-service:
build: ./orders-query-service
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8001:8001"
The ./orders-command-service and ./orders-query-service folders will contain the Python code
and the respective Dockerfiles.
4. Message model and Kafka topics
Let’s define a simple event for order creation. We don’t need a schema registry for the example, but in production it’s recommended to use Avro, Protobuf, or JSON Schema.
{
"event_type": "OrderCreated",
"order_id": "123e4567-e89b-12d3-a456-426614174000",
"customer_id": "customer-1",
"items": [
{"sku": "prod-1", "quantity": 2},
{"sku": "prod-2", "quantity": 1}
],
"total": 42.50,
"created_at": "2025-12-05T10:00:00Z"
}
We’ll use a Kafka topic called orders.events. The command service will produce messages to this topic,
while the query service will consume them.
5. Command service in Python
For simplicity we’ll use FastAPI for the API and confluent-kafka as the Kafka client.
A possible main.py file could be:
from fastapi import FastAPI
from pydantic import BaseModel
from uuid import uuid4
from datetime import datetime
from confluent_kafka import Producer
import json
import os
app = FastAPI()
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"
producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})
class OrderItem(BaseModel):
sku: str
quantity: int
class CreateOrderCommand(BaseModel):
customer_id: str
items: list[OrderItem]
def delivery_report(err, msg):
if err is not None:
# In production: structured logging
print(f"Error sending message: {err}")
else:
print(f"Event sent to topic {msg.topic()} [partition {msg.partition()}]")
@app.post("/orders")
def create_order(cmd: CreateOrderCommand):
order_id = str(uuid4())
total = sum(item.quantity * 10 for item in cmd.items) # example total calculation
event = {
"event_type": "OrderCreated",
"order_id": order_id,
"customer_id": cmd.customer_id,
"items": [item.model_dump() for item in cmd.items],
"total": total,
"created_at": datetime.utcnow().isoformat() + "Z",
}
producer.produce(
ORDERS_TOPIC,
value=json.dumps(event).encode("utf-8"),
on_delivery=delivery_report,
)
producer.flush()
# In a real case we would save the order in the write model (write DB)
return {"order_id": order_id, "status": "CREATED"}
Command service Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
A possible requirements.txt:
fastapi
uvicorn[standard]
pydantic
confluent-kafka
6. Query service in Python
The query service consumes events from Kafka, applies them to a read model, and provides a REST API to query it. For the example we use a simple in-memory dictionary.
from fastapi import FastAPI
from confluent_kafka import Consumer
import json
import os
import threading
import time
app = FastAPI()
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"
# In-memory read model: map order_id -> order
orders_read_model: dict[str, dict] = {}
consumer_config = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"group.id": "orders-query-service",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_config)
def consume_events():
consumer.subscribe([ORDERS_TOPIC])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
event = json.loads(msg.value().decode("utf-8"))
handle_event(event)
def handle_event(event: dict):
event_type = event.get("event_type")
if event_type == "OrderCreated":
order_id = event["order_id"]
orders_read_model[order_id] = {
"order_id": order_id,
"customer_id": event["customer_id"],
"items": event["items"],
"total": event["total"],
"created_at": event["created_at"],
}
print(f"Order {order_id} added to the read model")
else:
print(f"Ignored event: {event_type}")
@app.on_event("startup")
def startup_event():
thread = threading.Thread(target=consume_events, daemon=True)
thread.start()
@app.get("/orders")
def list_orders():
return list(orders_read_model.values())
@app.get("/orders/{order_id}")
def get_order(order_id: str):
order = orders_read_model.get(order_id)
if not order:
return {"error": "Order not found"}
return order
Query service Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8001
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]
The requirements.txt will be similar to the command service one:
fastapi
uvicorn[standard]
confluent-kafka
7. Starting the environment with Docker
With all files in place, the typical workflow is:
- Build and start the services with Docker Compose:
docker compose up --build
- Send an order creation command to the command service:
curl -X POST http://localhost:8000/orders -H "Content-Type: application/json" -d '{
"customer_id": "customer-1",
"items": [
{"sku": "prod-1", "quantity": 2},
{"sku": "prod-2", "quantity": 1}
]
}'
- Retrieve orders from the query service:
curl http://localhost:8001/orders
You should see the order you just created, read from the read model fed by Kafka events.
8. Eventual consistency and implications
A fundamental aspect of CQRS with messaging is eventual consistency. Between the moment you send the command to the command service and the moment the query service updates its read model a few milliseconds may pass (or more, if there are issues). This means that:
- you cannot expect reads to be immediately up to date after writes;
- some workflows require client-side retry logic or the use of asynchronous notifications;
- it’s important to monitor consumer lag and propagation times.
9. Possible extensions
Starting from this minimal example, you can evolve the architecture in several ways:
- use a relational or NoSQL write database with well-defined repositories and aggregate roots;
- introduce event sourcing, persisting the event log as the source of truth;
- replicate the read model across multiple nodes and use distributed caches;
-
add more event types (e.g.
OrderPaid,OrderCancelled) and dedicated projections; - integrate observability tools (metrics, tracing, structured logging) to follow the flow of commands and events across the entire system.
10. Conclusions
The CQRS pattern, combined with Kafka and Docker, makes it possible to build scalable and robust Python systems where reads and writes are clearly separated. The command side focuses on business rules and event generation; the query side is dedicated to providing read-optimized views, leveraging Kafka’s power to propagate changes.
The example shown is intentionally simple, but it represents an excellent starting point to experiment with CQRS in a real environment, gradually adding persistence, security, observability, and advanced practices such as event sourcing and distributed sagas.