Implementing CQRS in Go with Kafka and Docker

In this article you will see how to implement the CQRS (Command Query Responsibility Segregation) pattern in Go using Apache Kafka as the messaging backbone and Docker to easily run all services locally.

The goal is to build a small architecture that manages a simple example domain, a product catalog, clearly separating the write side (command) from the read side (query).

1. Quick recap: what is CQRS

The CQRS pattern states that write and read operations are handled by different models:

  • Command side (Write model): receives commands, validates domain rules, and generates events.
  • Query side (Read model): receives events, updates a model optimized for reading, and exposes read-only APIs.

In a typical CQRS system, communication between the command and query sides happens through events published on a bus or broker. Here we will use Kafka.

2. Example architecture

The architecture we will implement contains the following components:

  1. Command API in Go: exposes HTTP endpoints to create and update products.
  2. Kafka: topic for domain events, for example product-events.
  3. Query worker in Go: consumes events from Kafka and updates a read database.
  4. Query API in Go: exposes read-only HTTP endpoints.
  5. Docker Compose: orchestrates everything (Go services, Kafka, database).

For simplicity we will use:

  • PostgreSQL as the read database.
  • In-process memory as the write model (command side) to keep the example lightweight.

3. Project structure

A possible folder structure could be:

cqrs-go-kafka/
  cmd/
    command-api/
      main.go
    query-api/
      main.go
    query-worker/
      main.go
  internal/
    command/
      handler.go
      model.go
      kafka_producer.go
    query/
      store.go
      kafka_consumer.go
    shared/
      events.go
  docker/
    docker-compose.yml
    Dockerfile.command-api
    Dockerfile.query-api
    Dockerfile.query-worker

4. Defining domain events

The heart of CQRS is the domain event. Let’s define some simple events in the internal/shared package.

// internal/shared/events.go
package shared

import (
    "encoding/json"
    "time"
)

type EventType string

const (
    EventProductCreated EventType = "ProductCreated"
    EventProductUpdated EventType = "ProductUpdated"
)

type Event struct {
    ID        string    `json:"id"`
    Type      EventType `json:"type"`
    Payload   any       `json:"payload"`
    Timestamp time.Time `json:"timestamp"`
}

type ProductCreatedPayload struct {
    ID    string  `json:"id"`
    Name  string  `json:"name"`
    Price float64 `json:"price"`
}

type ProductUpdatedPayload struct {
    ID    string   `json:"id"`
    Name  *string  `json:"name,omitempty"`
    Price *float64 `json:"price,omitempty"`
}

func Marshal(e Event) ([]byte, error) {
    return json.Marshal(e)
}

5. Command side: write API

The command API exposes endpoints to create and update products. Every time a command succeeds, we publish an event to Kafka.

5.1 In-memory domain model

// internal/command/model.go
package command

import (
    "errors"
    "sync"
)

var (
    ErrProductNotFound = errors.New("product not found")
)

type Product struct {
    ID    string
    Name  string
    Price float64
}

type InMemoryProductRepo struct {
    mu       sync.RWMutex
    products map[string]Product
}

func NewInMemoryProductRepo() *InMemoryProductRepo {
    return &InMemoryProductRepo{
        products: make(map[string]Product),
    }
}

func (r *InMemoryProductRepo) Create(p Product) error {
    r.mu.Lock()
    defer r.mu.Unlock()

    if _, ok := r.products[p.ID]; ok {
        return errors.New("product already exists")
    }
    r.products[p.ID] = p
    return nil
}

func (r *InMemoryProductRepo) Update(p Product) error {
    r.mu.Lock()
    defer r.mu.Unlock()

    if _, ok := r.products[p.ID]; !ok {
        return ErrProductNotFound
    }
    r.products[p.ID] = p
    return nil
}

5.2 Kafka producer

We use the github.com/segmentio/kafka-go library to interact with Kafka. The producer encapsulates the logic for publishing events.

// internal/command/kafka_producer.go
package command

import (
    "context"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/google/uuid"
    "cqrs-go-kafka/internal/shared"
)

type EventProducer struct {
    writer *kafka.Writer
}

func NewEventProducer(brokers []string, topic string) *EventProducer {
    return &EventProducer{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(brokers...),
            Topic:        topic,
            Balancer:     &kafka.LeastBytes{},
            RequiredAcks: kafka.RequireAll,
        },
    }
}

func (p *EventProducer) Publish(ctx context.Context, eventType shared.EventType, payload any) error {
    e := shared.Event{
        ID:        uuid.NewString(),
        Type:      eventType,
        Payload:   payload,
        Timestamp: time.Now().UTC(),
    }

    value, err := shared.Marshal(e)
    if err != nil {
        return err
    }

    msg := kafka.Message{
        Key:   []byte(e.ID),
        Value: value,
    }

    if err := p.writer.WriteMessages(ctx, msg); err != nil {
        log.Printf("failed to write message: %v", err)
        return err
    }

    return nil
}

func (p *EventProducer) Close() error {
    return p.writer.Close()
}

5.3 HTTP command handlers

The HTTP API receives JSON, invokes the domain repository, and publishes the corresponding event to Kafka.

// internal/command/handler.go
package command

import (
    "context"
    "encoding/json"
    "net/http"

    "github.com/google/uuid"
    "cqrs-go-kafka/internal/shared"
)

type CommandHandler struct {
    repo     *InMemoryProductRepo
    producer *EventProducer
}

func NewCommandHandler(repo *InMemoryProductRepo, producer *EventProducer) *CommandHandler {
    return &CommandHandler{repo: repo, producer: producer}
}

type createProductRequest struct {
    Name  string  `json:"name"`
    Price float64 `json:"price"`
}

func (h *CommandHandler) CreateProduct(w http.ResponseWriter, r *http.Request) {
    var req createProductRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    id := uuid.NewString()
    product := Product{
        ID:    id,
        Name:  req.Name,
        Price: req.Price,
    }

    if err := h.repo.Create(product); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    payload := shared.ProductCreatedPayload{
        ID:    product.ID,
        Name:  product.Name,
        Price: product.Price,
    }

    if err := h.producer.Publish(r.Context(), shared.EventProductCreated, payload); err != nil {
        http.Error(w, "failed to publish event", http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusCreated)
    _ = json.NewEncoder(w).Encode(map[string]string{"id": id})
}

type updateProductRequest struct {
    Name  *string  `json:"name"`
    Price *float64 `json:"price"`
}

func (h *CommandHandler) UpdateProduct(w http.ResponseWriter, r *http.Request) {
    id := r.PathValue("id")

    var req updateProductRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    existing := h.repo.products[id]
    if existing.ID == "" {
        http.Error(w, "product not found", http.StatusNotFound)
        return
    }

    if req.Name != nil {
        existing.Name = *req.Name
    }
    if req.Price != nil {
        existing.Price = *req.Price
    }

    if err := h.repo.Update(existing); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    payload := shared.ProductUpdatedPayload{
        ID:    existing.ID,
        Name:  req.Name,
        Price: req.Price,
    }

    if err := h.producer.Publish(r.Context(), shared.EventProductUpdated, payload); err != nil {
        http.Error(w, "failed to publish event", http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusOK)
}

5.4 Command API entrypoint

// cmd/command-api/main.go
package main

import (
    "log"
    "net/http"
    "os"

    "cqrs-go-kafka/internal/command"
)

func main() {
    brokers := []string{os.Getenv("KAFKA_BROKER")}
    topic := os.Getenv("KAFKA_TOPIC")

    repo := command.NewInMemoryProductRepo()
    producer := command.NewEventProducer(brokers, topic)
    defer producer.Close()

    handler := command.NewCommandHandler(repo, producer)

    mux := http.NewServeMux()
    mux.HandleFunc("POST /products", handler.CreateProduct)
    mux.HandleFunc("PUT /products/{id}", handler.UpdateProduct)

    addr := ":8080"
    log.Printf("command-api listening on %s", addr)
    if err := http.ListenAndServe(addr, mux); err != nil {
        log.Fatal(err)
    }
}

6. Query side: worker and read API

The query side maintains a read-optimized projection in PostgreSQL. A worker consumes events from Kafka and updates the tables, while an HTTP API exposes read endpoints.

6.1 PostgreSQL store

// internal/query/store.go
package query

import (
    "context"
    "database/sql"
)

type ProductReadModel struct {
    ID    string  `json:"id"`
    Name  string  `json:"name"`
    Price float64 `json:"price"`
}

type Store struct {
    db *sql.DB
}

func NewStore(db *sql.DB) *Store {
    return &Store{db: db}
}

func (s *Store) InitSchema(ctx context.Context) error {
    _, err := s.db.ExecContext(ctx, `
        CREATE TABLE IF NOT EXISTS products_read (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            price DOUBLE PRECISION NOT NULL
        );
    `)
    return err
}

func (s *Store) UpsertProduct(ctx context.Context, p ProductReadModel) error {
    _, err := s.db.ExecContext(ctx, `
        INSERT INTO products_read (id, name, price)
        VALUES ($1, $2, $3)
        ON CONFLICT (id) DO UPDATE SET
            name = EXCLUDED.name,
            price = EXCLUDED.price;
    `, p.ID, p.Name, p.Price)
    return err
}

func (s *Store) GetProduct(ctx context.Context, id string) (*ProductReadModel, error) {
    row := s.db.QueryRowContext(ctx, `
        SELECT id, name, price FROM products_read WHERE id = $1;
    `, id)

    var p ProductReadModel
    if err := row.Scan(&p.ID, &p.Name, &p.Price); err != nil {
        return nil, err
    }
    return &p, nil
}

func (s *Store) ListProducts(ctx context.Context) ([]ProductReadModel, error) {
    rows, err := s.db.QueryContext(ctx, `
        SELECT id, name, price FROM products_read ORDER BY name;
    `)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var result []ProductReadModel
    for rows.Next() {
        var p ProductReadModel
        if err := rows.Scan(&p.ID, &p.Name, &p.Price); err != nil {
            return nil, err
        }
        result = append(result, p)
    }
    return result, nil
}

6.2 Kafka consumer and projection

// internal/query/kafka_consumer.go
package query

import (
    "context"
    "encoding/json"
    "log"

    "github.com/segmentio/kafka-go"
    "cqrs-go-kafka/internal/shared"
)

type EventConsumer struct {
    reader *kafka.Reader
    store  *Store
}

func NewEventConsumer(brokers []string, topic, groupID string, store *Store) *EventConsumer {
    return &EventConsumer{
        reader: kafka.NewReader(kafka.ReaderConfig{
            Brokers: brokers,
            Topic:   topic,
            GroupID: groupID,
        }),
        store: store,
    }
}

func (c *EventConsumer) Run(ctx context.Context) error {
    for {
        m, err := c.reader.ReadMessage(ctx)
        if err != nil {
            return err
        }

        var e shared.Event
        if err := json.Unmarshal(m.Value, &e); err != nil {
            log.Printf("failed to unmarshal event: %v", err)
            continue
        }

        switch e.Type {
        case shared.EventProductCreated:
            var p shared.ProductCreatedPayload
            if err := json.Unmarshal(m.Value, &e); err == nil {
                payloadBytes, _ := json.Marshal(e.Payload)
                _ = json.Unmarshal(payloadBytes, &p)
            }

            if err := c.store.UpsertProduct(ctx, ProductReadModel{
                ID:    p.ID,
                Name:  p.Name,
                Price: p.Price,
            }); err != nil {
                log.Printf("failed to upsert product: %v", err)
            }

        case shared.EventProductUpdated:
            var p shared.ProductUpdatedPayload
            payloadBytes, _ := json.Marshal(e.Payload)
            _ = json.Unmarshal(payloadBytes, &p)

            existing, err := c.store.GetProduct(ctx, p.ID)
            if err != nil {
                log.Printf("product not found in read model: %v", err)
                continue
            }

            if p.Name != nil {
                existing.Name = *p.Name
            }
            if p.Price != nil {
                existing.Price = *p.Price
            }

            if err := c.store.UpsertProduct(ctx, *existing); err != nil {
                log.Printf("failed to update product: %v", err)
            }

        default:
            log.Printf("unknown event type: %s", e.Type)
        }
    }
}

func (c *EventConsumer) Close() error {
    return c.reader.Close()
}

6.3 Query worker entrypoint

// cmd/query-worker/main.go
package main

import (
    "context"
    "database/sql"
    "log"
    "os"
    "time"

    _ "github.com/lib/pq"
    "cqrs-go-kafka/internal/query"
)

func main() {
    brokers := []string{os.Getenv("KAFKA_BROKER")}
    topic := os.Getenv("KAFKA_TOPIC")
    groupID := "query-worker"

    dsn := os.Getenv("READ_DB_DSN")
    db, err := sql.Open("postgres", dsn)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    store := query.NewStore(db)
    if err := store.InitSchema(ctx); err != nil {
        log.Fatalf("failed to init schema: %v", err)
    }

    consumer := query.NewEventConsumer(brokers, topic, groupID, store)
    log.Println("query-worker running...")
    if err := consumer.Run(context.Background()); err != nil {
        log.Fatal(err)
    }
}

6.4 Read API

// cmd/query-api/main.go
package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "net/http"
    "os"
    "time"

    _ "github.com/lib/pq"
    "cqrs-go-kafka/internal/query"
)

func main() {
    dsn := os.Getenv("READ_DB_DSN")
    db, err := sql.Open("postgres", dsn)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    store := query.NewStore(db)

    mux := http.NewServeMux()

    mux.HandleFunc("GET /products", func(w http.ResponseWriter, r *http.Request) {
        ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
        defer cancel()

        products, err := store.ListProducts(ctx)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        _ = json.NewEncoder(w).Encode(products)
    })

    mux.HandleFunc("GET /products/{id}", func(w http.ResponseWriter, r *http.Request) {
        id := r.PathValue("id")

        ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
        defer cancel()

        product, err := store.GetProduct(ctx, id)
        if err != nil {
            http.Error(w, "not found", http.StatusNotFound)
            return
        }

        _ = json.NewEncoder(w).Encode(product)
    })

    addr := ":8081"
    log.Printf("query-api listening on %s", addr)
    if err := http.ListenAndServe(addr, mux); err != nil {
        log.Fatal(err)
    }
}

7. Orchestration with Docker and Docker Compose

To simplify local execution, we use Docker to containerize the Go services and Docker Compose to start all components with a single command.

7.1 Dockerfile for Go services

Example of a reusable Dockerfile for Go binaries:

# docker/Dockerfile.base
FROM golang:1.23-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/command-api ./cmd/command-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-api ./cmd/query-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-worker ./cmd/query-worker

FROM alpine:3.20
WORKDIR /app
COPY --from=builder /bin/command-api /bin/command-api
COPY --from=builder /bin/query-api /bin/query-api
COPY --from=builder /bin/query-worker /bin/query-worker

EXPOSE 8080 8081
ENTRYPOINT ["/bin/sh"]

Alternatively, you can create a Dockerfile for each service, but for a demo it is often more convenient to have a single image with all binaries.

7.2 docker-compose.yml

The Compose file defines Kafka, PostgreSQL, and the three Go services.

# docker/docker-compose.yml
version: "3.9"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  read-db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: cqrs
      POSTGRES_PASSWORD: cqrs
      POSTGRES_DB: cqrs_read
    ports:
      - "5432:5432"

  app:
    build:
      context: ..
      dockerfile: docker/Dockerfile.base
    depends_on:
      - kafka
      - read-db
    environment:
      KAFKA_BROKER: kafka:9092
      KAFKA_TOPIC: product-events
      READ_DB_DSN: postgres://cqrs:cqrs@read-db:5432/cqrs_read?sslmode=disable
    command: ["/bin/sh", "-c", "command-api & query-api & query-worker && tail -f /dev/null"]

8. Testing the architecture

Once the image is built and Compose is running, you can test the CQRS flow using curl or a tool like HTTPie or Postman.

8.1 Starting the containers

cd docker
docker compose up --build

8.2 Creating a product

curl -X POST http://localhost:8080/products \
  -H "Content-Type: application/json" \
  -d '{ "name": "Ergonomic mouse", "price": 39.90 }'

The command side creates the product in memory, publishes the event to Kafka, the worker updates the read DB, and the query API can return the new product.

8.3 Reading products

curl http://localhost:8081/products

You should see the newly created product in the JSON response.

9. Eventual consistency and considerations

In a CQRS system, the query side is usually eventually consistent with respect to the command side. This means there may be a small delay between when a command is executed and when the read model reflects the updated state.

Some best practices to keep in mind:

  • Define stable and versioned domain events.
  • Carefully handle error cases in the Kafka consumer (retries, dead letter queues, structured logs).
  • Keep the read model simple and denormalized to optimize reads.
  • Monitor consumer lag to understand how up to date the query side is.

10. Possible extensions

Starting from this minimal example, you can extend the architecture in several ways:

  • Introduce a real persistent write model (e.g. PostgreSQL or an event store).
  • Add other services that consume the same events for different features.
  • Implement sagas or process managers to orchestrate distributed workflows.
  • Use Avro or Protobuf schemas with a schema registry for evolvable events.

CQRS is not the right solution for every scenario, but in complex systems with high scalability requirements and heavy read throughput it can help cleanly separate responsibilities, as you have seen in this example with Go, Kafka, and Docker.

Back to top