Applying the CQRS Pattern in Python with Kafka and Docker

In this article we’ll see how to apply the CQRS (Command Query Responsibility Segregation) pattern in a small Python micro-project that uses Kafka as a message broker and Docker to run the services.

The goal is to build a small architecture with:

  • a command service that receives write requests and publishes events to Kafka;
  • a query service that reads events from Kafka and updates an optimized read model;
  • a write model separated from the read model.

1. Quick refresher: what is CQRS

CQRS is an architectural pattern that explicitly separates the write side (Commands) from the read side (Queries). Instead of having a single service or model that handles both reads and writes, we have two:

  • Command side: receives commands (e.g. create order, update status), validates business rules and generates events (e.g. OrderCreated, OrderStatusChanged).
  • Query side: receives events, updates a database optimized for reads (for example denormalized tables, caches, materialized views) and exposes read-only APIs.

Kafka is perfect for connecting these two parts: the command side publishes events to a topic and the query side consumes them to build and maintain the read model.

2. Example architecture

Let’s build a simple order management system:

  • orders-command-service (Python):
    • exposes a small REST API to create orders;
    • writes the order to a write database (for example PostgreSQL or even in-memory for the example);
    • publishes an OrderCreated event to Kafka.
  • orders-query-service (Python):
    • consumes OrderCreated events from Kafka;
    • updates a read database (for the example: a simple in-memory store or SQLite);
    • exposes a REST API to read orders.
  • Kafka and ZooKeeper (or KRaft, depending on the image used) in Docker.

3. Docker Compose for Kafka and the services

To orchestrate everything we’ll use docker-compose.yml. The following file defines a minimal Kafka cluster and two Python services:


version: "3.9"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  orders-command-service:
    build: ./orders-command-service
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8000:8000"

  orders-query-service:
    build: ./orders-query-service
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8001:8001"

The ./orders-command-service and ./orders-query-service folders will contain the Python code and the respective Dockerfiles.

4. Message model and Kafka topics

Let’s define a simple event for order creation. We don’t need a schema registry for the example, but in production it’s recommended to use Avro, Protobuf, or JSON Schema.


{
  "event_type": "OrderCreated",
  "order_id": "123e4567-e89b-12d3-a456-426614174000",
  "customer_id": "customer-1",
  "items": [
    {"sku": "prod-1", "quantity": 2},
    {"sku": "prod-2", "quantity": 1}
  ],
  "total": 42.50,
  "created_at": "2025-12-05T10:00:00Z"
}

We’ll use a Kafka topic called orders.events. The command service will produce messages to this topic, while the query service will consume them.

5. Command service in Python

For simplicity we’ll use FastAPI for the API and confluent-kafka as the Kafka client. A possible main.py file could be:


from fastapi import FastAPI
from pydantic import BaseModel
from uuid import uuid4
from datetime import datetime
from confluent_kafka import Producer
import json
import os

app = FastAPI()

KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"

producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})


class OrderItem(BaseModel):
    sku: str
    quantity: int


class CreateOrderCommand(BaseModel):
    customer_id: str
    items: list[OrderItem]


def delivery_report(err, msg):
    if err is not None:
        # In production: structured logging
        print(f"Error sending message: {err}")
    else:
        print(f"Event sent to topic {msg.topic()} [partition {msg.partition()}]")


@app.post("/orders")
def create_order(cmd: CreateOrderCommand):
    order_id = str(uuid4())
    total = sum(item.quantity * 10 for item in cmd.items)  # example total calculation
    event = {
        "event_type": "OrderCreated",
        "order_id": order_id,
        "customer_id": cmd.customer_id,
        "items": [item.model_dump() for item in cmd.items],
        "total": total,
        "created_at": datetime.utcnow().isoformat() + "Z",
    }

    producer.produce(
        ORDERS_TOPIC,
        value=json.dumps(event).encode("utf-8"),
        on_delivery=delivery_report,
    )
    producer.flush()

    # In a real case we would save the order in the write model (write DB)
    return {"order_id": order_id, "status": "CREATED"}

Command service Dockerfile


FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

A possible requirements.txt:


fastapi
uvicorn[standard]
pydantic
confluent-kafka

6. Query service in Python

The query service consumes events from Kafka, applies them to a read model, and provides a REST API to query it. For the example we use a simple in-memory dictionary.


from fastapi import FastAPI
from confluent_kafka import Consumer
import json
import os
import threading
import time

app = FastAPI()

KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
ORDERS_TOPIC = "orders.events"

# In-memory read model: map order_id -> order
orders_read_model: dict[str, dict] = {}

consumer_config = {
    "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
    "group.id": "orders-query-service",
    "auto.offset.reset": "earliest",
}

consumer = Consumer(consumer_config)


def consume_events():
    consumer.subscribe([ORDERS_TOPIC])
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue
        event = json.loads(msg.value().decode("utf-8"))
        handle_event(event)


def handle_event(event: dict):
    event_type = event.get("event_type")
    if event_type == "OrderCreated":
        order_id = event["order_id"]
        orders_read_model[order_id] = {
            "order_id": order_id,
            "customer_id": event["customer_id"],
            "items": event["items"],
            "total": event["total"],
            "created_at": event["created_at"],
        }
        print(f"Order {order_id} added to the read model")
    else:
        print(f"Ignored event: {event_type}")


@app.on_event("startup")
def startup_event():
    thread = threading.Thread(target=consume_events, daemon=True)
    thread.start()


@app.get("/orders")
def list_orders():
    return list(orders_read_model.values())


@app.get("/orders/{order_id}")
def get_order(order_id: str):
    order = orders_read_model.get(order_id)
    if not order:
        return {"error": "Order not found"}
    return order

Query service Dockerfile


FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8001

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]

The requirements.txt will be similar to the command service one:


fastapi
uvicorn[standard]
confluent-kafka

7. Starting the environment with Docker

With all files in place, the typical workflow is:

  1. Build and start the services with Docker Compose:

docker compose up --build
  1. Send an order creation command to the command service:

curl -X POST http://localhost:8000/orders   -H "Content-Type: application/json"   -d '{
    "customer_id": "customer-1",
    "items": [
      {"sku": "prod-1", "quantity": 2},
      {"sku": "prod-2", "quantity": 1}
    ]
  }'
  1. Retrieve orders from the query service:

curl http://localhost:8001/orders

You should see the order you just created, read from the read model fed by Kafka events.

8. Eventual consistency and implications

A fundamental aspect of CQRS with messaging is eventual consistency. Between the moment you send the command to the command service and the moment the query service updates its read model a few milliseconds may pass (or more, if there are issues). This means that:

  • you cannot expect reads to be immediately up to date after writes;
  • some workflows require client-side retry logic or the use of asynchronous notifications;
  • it’s important to monitor consumer lag and propagation times.

9. Possible extensions

Starting from this minimal example, you can evolve the architecture in several ways:

  • use a relational or NoSQL write database with well-defined repositories and aggregate roots;
  • introduce event sourcing, persisting the event log as the source of truth;
  • replicate the read model across multiple nodes and use distributed caches;
  • add more event types (e.g. OrderPaid, OrderCancelled) and dedicated projections;
  • integrate observability tools (metrics, tracing, structured logging) to follow the flow of commands and events across the entire system.

10. Conclusions

The CQRS pattern, combined with Kafka and Docker, makes it possible to build scalable and robust Python systems where reads and writes are clearly separated. The command side focuses on business rules and event generation; the query side is dedicated to providing read-optimized views, leveraging Kafka’s power to propagate changes.

The example shown is intentionally simple, but it represents an excellent starting point to experiment with CQRS in a real environment, gradually adding persistence, security, observability, and advanced practices such as event sourcing and distributed sagas.

Back to top