In this article you will see how to implement the CQRS (Command Query Responsibility Segregation) pattern in Go using Apache Kafka as the messaging backbone and Docker to easily run all services locally.
The goal is to build a small architecture that manages a simple example domain, a product catalog, clearly separating the write side (command) from the read side (query).
1. Quick recap: what is CQRS
The CQRS pattern states that write and read operations are handled by different models:
- Command side (Write model): receives commands, validates domain rules, and generates events.
- Query side (Read model): receives events, updates a model optimized for reading, and exposes read-only APIs.
In a typical CQRS system, communication between the command and query sides happens through events published on a bus or broker. Here we will use Kafka.
2. Example architecture
The architecture we will implement contains the following components:
- Command API in Go: exposes HTTP endpoints to create and update products.
- Kafka: topic for domain events, for example
product-events. - Query worker in Go: consumes events from Kafka and updates a read database.
- Query API in Go: exposes read-only HTTP endpoints.
- Docker Compose: orchestrates everything (Go services, Kafka, database).
For simplicity we will use:
- PostgreSQL as the read database.
- In-process memory as the write model (command side) to keep the example lightweight.
3. Project structure
A possible folder structure could be:
cqrs-go-kafka/
cmd/
command-api/
main.go
query-api/
main.go
query-worker/
main.go
internal/
command/
handler.go
model.go
kafka_producer.go
query/
store.go
kafka_consumer.go
shared/
events.go
docker/
docker-compose.yml
Dockerfile.command-api
Dockerfile.query-api
Dockerfile.query-worker
4. Defining domain events
The heart of CQRS is the domain event. Let’s define some simple events in the
internal/shared package.
// internal/shared/events.go
package shared
import (
"encoding/json"
"time"
)
type EventType string
const (
EventProductCreated EventType = "ProductCreated"
EventProductUpdated EventType = "ProductUpdated"
)
type Event struct {
ID string `json:"id"`
Type EventType `json:"type"`
Payload any `json:"payload"`
Timestamp time.Time `json:"timestamp"`
}
type ProductCreatedPayload struct {
ID string `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
type ProductUpdatedPayload struct {
ID string `json:"id"`
Name *string `json:"name,omitempty"`
Price *float64 `json:"price,omitempty"`
}
func Marshal(e Event) ([]byte, error) {
return json.Marshal(e)
}
5. Command side: write API
The command API exposes endpoints to create and update products. Every time a command succeeds, we publish an event to Kafka.
5.1 In-memory domain model
// internal/command/model.go
package command
import (
"errors"
"sync"
)
var (
ErrProductNotFound = errors.New("product not found")
)
type Product struct {
ID string
Name string
Price float64
}
type InMemoryProductRepo struct {
mu sync.RWMutex
products map[string]Product
}
func NewInMemoryProductRepo() *InMemoryProductRepo {
return &InMemoryProductRepo{
products: make(map[string]Product),
}
}
func (r *InMemoryProductRepo) Create(p Product) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.products[p.ID]; ok {
return errors.New("product already exists")
}
r.products[p.ID] = p
return nil
}
func (r *InMemoryProductRepo) Update(p Product) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.products[p.ID]; !ok {
return ErrProductNotFound
}
r.products[p.ID] = p
return nil
}
5.2 Kafka producer
We use the github.com/segmentio/kafka-go library to interact with Kafka. The producer encapsulates
the logic for publishing events.
// internal/command/kafka_producer.go
package command
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
"github.com/google/uuid"
"cqrs-go-kafka/internal/shared"
)
type EventProducer struct {
writer *kafka.Writer
}
func NewEventProducer(brokers []string, topic string) *EventProducer {
return &EventProducer{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireAll,
},
}
}
func (p *EventProducer) Publish(ctx context.Context, eventType shared.EventType, payload any) error {
e := shared.Event{
ID: uuid.NewString(),
Type: eventType,
Payload: payload,
Timestamp: time.Now().UTC(),
}
value, err := shared.Marshal(e)
if err != nil {
return err
}
msg := kafka.Message{
Key: []byte(e.ID),
Value: value,
}
if err := p.writer.WriteMessages(ctx, msg); err != nil {
log.Printf("failed to write message: %v", err)
return err
}
return nil
}
func (p *EventProducer) Close() error {
return p.writer.Close()
}
5.3 HTTP command handlers
The HTTP API receives JSON, invokes the domain repository, and publishes the corresponding event to Kafka.
// internal/command/handler.go
package command
import (
"context"
"encoding/json"
"net/http"
"github.com/google/uuid"
"cqrs-go-kafka/internal/shared"
)
type CommandHandler struct {
repo *InMemoryProductRepo
producer *EventProducer
}
func NewCommandHandler(repo *InMemoryProductRepo, producer *EventProducer) *CommandHandler {
return &CommandHandler{repo: repo, producer: producer}
}
type createProductRequest struct {
Name string `json:"name"`
Price float64 `json:"price"`
}
func (h *CommandHandler) CreateProduct(w http.ResponseWriter, r *http.Request) {
var req createProductRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
id := uuid.NewString()
product := Product{
ID: id,
Name: req.Name,
Price: req.Price,
}
if err := h.repo.Create(product); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
payload := shared.ProductCreatedPayload{
ID: product.ID,
Name: product.Name,
Price: product.Price,
}
if err := h.producer.Publish(r.Context(), shared.EventProductCreated, payload); err != nil {
http.Error(w, "failed to publish event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
_ = json.NewEncoder(w).Encode(map[string]string{"id": id})
}
type updateProductRequest struct {
Name *string `json:"name"`
Price *float64 `json:"price"`
}
func (h *CommandHandler) UpdateProduct(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
var req updateProductRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
existing := h.repo.products[id]
if existing.ID == "" {
http.Error(w, "product not found", http.StatusNotFound)
return
}
if req.Name != nil {
existing.Name = *req.Name
}
if req.Price != nil {
existing.Price = *req.Price
}
if err := h.repo.Update(existing); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
payload := shared.ProductUpdatedPayload{
ID: existing.ID,
Name: req.Name,
Price: req.Price,
}
if err := h.producer.Publish(r.Context(), shared.EventProductUpdated, payload); err != nil {
http.Error(w, "failed to publish event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
5.4 Command API entrypoint
// cmd/command-api/main.go
package main
import (
"log"
"net/http"
"os"
"cqrs-go-kafka/internal/command"
)
func main() {
brokers := []string{os.Getenv("KAFKA_BROKER")}
topic := os.Getenv("KAFKA_TOPIC")
repo := command.NewInMemoryProductRepo()
producer := command.NewEventProducer(brokers, topic)
defer producer.Close()
handler := command.NewCommandHandler(repo, producer)
mux := http.NewServeMux()
mux.HandleFunc("POST /products", handler.CreateProduct)
mux.HandleFunc("PUT /products/{id}", handler.UpdateProduct)
addr := ":8080"
log.Printf("command-api listening on %s", addr)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatal(err)
}
}
6. Query side: worker and read API
The query side maintains a read-optimized projection in PostgreSQL. A worker consumes events from Kafka and updates the tables, while an HTTP API exposes read endpoints.
6.1 PostgreSQL store
// internal/query/store.go
package query
import (
"context"
"database/sql"
)
type ProductReadModel struct {
ID string `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
type Store struct {
db *sql.DB
}
func NewStore(db *sql.DB) *Store {
return &Store{db: db}
}
func (s *Store) InitSchema(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS products_read (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price DOUBLE PRECISION NOT NULL
);
`)
return err
}
func (s *Store) UpsertProduct(ctx context.Context, p ProductReadModel) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO products_read (id, name, price)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
price = EXCLUDED.price;
`, p.ID, p.Name, p.Price)
return err
}
func (s *Store) GetProduct(ctx context.Context, id string) (*ProductReadModel, error) {
row := s.db.QueryRowContext(ctx, `
SELECT id, name, price FROM products_read WHERE id = $1;
`, id)
var p ProductReadModel
if err := row.Scan(&p.ID, &p.Name, &p.Price); err != nil {
return nil, err
}
return &p, nil
}
func (s *Store) ListProducts(ctx context.Context) ([]ProductReadModel, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, name, price FROM products_read ORDER BY name;
`)
if err != nil {
return nil, err
}
defer rows.Close()
var result []ProductReadModel
for rows.Next() {
var p ProductReadModel
if err := rows.Scan(&p.ID, &p.Name, &p.Price); err != nil {
return nil, err
}
result = append(result, p)
}
return result, nil
}
6.2 Kafka consumer and projection
// internal/query/kafka_consumer.go
package query
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
"cqrs-go-kafka/internal/shared"
)
type EventConsumer struct {
reader *kafka.Reader
store *Store
}
func NewEventConsumer(brokers []string, topic, groupID string, store *Store) *EventConsumer {
return &EventConsumer{
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
}),
store: store,
}
}
func (c *EventConsumer) Run(ctx context.Context) error {
for {
m, err := c.reader.ReadMessage(ctx)
if err != nil {
return err
}
var e shared.Event
if err := json.Unmarshal(m.Value, &e); err != nil {
log.Printf("failed to unmarshal event: %v", err)
continue
}
switch e.Type {
case shared.EventProductCreated:
var p shared.ProductCreatedPayload
if err := json.Unmarshal(m.Value, &e); err == nil {
payloadBytes, _ := json.Marshal(e.Payload)
_ = json.Unmarshal(payloadBytes, &p)
}
if err := c.store.UpsertProduct(ctx, ProductReadModel{
ID: p.ID,
Name: p.Name,
Price: p.Price,
}); err != nil {
log.Printf("failed to upsert product: %v", err)
}
case shared.EventProductUpdated:
var p shared.ProductUpdatedPayload
payloadBytes, _ := json.Marshal(e.Payload)
_ = json.Unmarshal(payloadBytes, &p)
existing, err := c.store.GetProduct(ctx, p.ID)
if err != nil {
log.Printf("product not found in read model: %v", err)
continue
}
if p.Name != nil {
existing.Name = *p.Name
}
if p.Price != nil {
existing.Price = *p.Price
}
if err := c.store.UpsertProduct(ctx, *existing); err != nil {
log.Printf("failed to update product: %v", err)
}
default:
log.Printf("unknown event type: %s", e.Type)
}
}
}
func (c *EventConsumer) Close() error {
return c.reader.Close()
}
6.3 Query worker entrypoint
// cmd/query-worker/main.go
package main
import (
"context"
"database/sql"
"log"
"os"
"time"
_ "github.com/lib/pq"
"cqrs-go-kafka/internal/query"
)
func main() {
brokers := []string{os.Getenv("KAFKA_BROKER")}
topic := os.Getenv("KAFKA_TOPIC")
groupID := "query-worker"
dsn := os.Getenv("READ_DB_DSN")
db, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
store := query.NewStore(db)
if err := store.InitSchema(ctx); err != nil {
log.Fatalf("failed to init schema: %v", err)
}
consumer := query.NewEventConsumer(brokers, topic, groupID, store)
log.Println("query-worker running...")
if err := consumer.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
6.4 Read API
// cmd/query-api/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"os"
"time"
_ "github.com/lib/pq"
"cqrs-go-kafka/internal/query"
)
func main() {
dsn := os.Getenv("READ_DB_DSN")
db, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
store := query.NewStore(db)
mux := http.NewServeMux()
mux.HandleFunc("GET /products", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
products, err := store.ListProducts(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = json.NewEncoder(w).Encode(products)
})
mux.HandleFunc("GET /products/{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
product, err := store.GetProduct(ctx, id)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
_ = json.NewEncoder(w).Encode(product)
})
addr := ":8081"
log.Printf("query-api listening on %s", addr)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatal(err)
}
}
7. Orchestration with Docker and Docker Compose
To simplify local execution, we use Docker to containerize the Go services and Docker Compose to start all components with a single command.
7.1 Dockerfile for Go services
Example of a reusable Dockerfile for Go binaries:
# docker/Dockerfile.base
FROM golang:1.23-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/command-api ./cmd/command-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-api ./cmd/query-api
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/query-worker ./cmd/query-worker
FROM alpine:3.20
WORKDIR /app
COPY --from=builder /bin/command-api /bin/command-api
COPY --from=builder /bin/query-api /bin/query-api
COPY --from=builder /bin/query-worker /bin/query-worker
EXPOSE 8080 8081
ENTRYPOINT ["/bin/sh"]
Alternatively, you can create a Dockerfile for each service, but for a demo it is often more convenient to have a single image with all binaries.
7.2 docker-compose.yml
The Compose file defines Kafka, PostgreSQL, and the three Go services.
# docker/docker-compose.yml
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
read-db:
image: postgres:16-alpine
environment:
POSTGRES_USER: cqrs
POSTGRES_PASSWORD: cqrs
POSTGRES_DB: cqrs_read
ports:
- "5432:5432"
app:
build:
context: ..
dockerfile: docker/Dockerfile.base
depends_on:
- kafka
- read-db
environment:
KAFKA_BROKER: kafka:9092
KAFKA_TOPIC: product-events
READ_DB_DSN: postgres://cqrs:cqrs@read-db:5432/cqrs_read?sslmode=disable
command: ["/bin/sh", "-c", "command-api & query-api & query-worker && tail -f /dev/null"]
8. Testing the architecture
Once the image is built and Compose is running, you can test the CQRS flow using curl or a tool like
HTTPie or Postman.
8.1 Starting the containers
cd docker
docker compose up --build
8.2 Creating a product
curl -X POST http://localhost:8080/products \
-H "Content-Type: application/json" \
-d '{ "name": "Ergonomic mouse", "price": 39.90 }'
The command side creates the product in memory, publishes the event to Kafka, the worker updates the read DB, and the query API can return the new product.
8.3 Reading products
curl http://localhost:8081/products
You should see the newly created product in the JSON response.
9. Eventual consistency and considerations
In a CQRS system, the query side is usually eventually consistent with respect to the command side. This means there may be a small delay between when a command is executed and when the read model reflects the updated state.
Some best practices to keep in mind:
- Define stable and versioned domain events.
- Carefully handle error cases in the Kafka consumer (retries, dead letter queues, structured logs).
- Keep the read model simple and denormalized to optimize reads.
- Monitor consumer lag to understand how up to date the query side is.
10. Possible extensions
Starting from this minimal example, you can extend the architecture in several ways:
- Introduce a real persistent write model (e.g. PostgreSQL or an event store).
- Add other services that consume the same events for different features.
- Implement sagas or process managers to orchestrate distributed workflows.
- Use Avro or Protobuf schemas with a schema registry for evolvable events.
CQRS is not the right solution for every scenario, but in complex systems with high scalability requirements and heavy read throughput it can help cleanly separate responsibilities, as you have seen in this example with Go, Kafka, and Docker.