Intermediate
This intermediate guide set extends the beginner hexagonal application — one context, in-memory adapters, composition root — with production-grade adapter implementations. Each guide adds one adapter or wiring concern: Postgres persistence, test doubles, domain event publishing via outbox, Anti-Corruption Layer for cross-context translation, database migrations, integration test harness, structured logging, and multi-environment configuration.
Running domain: procurement-platform-be (PurchaseOrder bounded context). All guides extend the same application skeleton built in the beginner tier.
Guide 7 — PostgreSQL Adapter for PurchaseOrder Repository
The in-memory repository from the beginner tier is a test double — production needs a real database. This guide implements PostgresPORepository that satisfies PurchaseOrderRepository (the domain port). Go uses pgx v5; Rust uses sqlx with the postgres feature and compile-time query macros. The adapter never imports domain logic — it only translates between SQL rows and domain structs. Production persistence must be explicit about query semantics and transaction isolation rather than relying on ORM magic.
Why It Matters
Persistence bugs discovered in production are expensive and often silent — corrupted data survives for weeks before anyone notices the inconsistency. Implementing the adapter with real pgx/sqlx calls (rather than ORM magic) gives explicit control over query performance, transaction isolation, and JSONB serialization behavior.
// => PostgresPORepository satisfies the PurchaseOrderRepository port
type PostgresPORepository struct {
pool *pgxpool.Pool // => pgxpool manages connection pool; pool is safe for concurrent use
}
// => NewPostgresPORepository creates the adapter; pool is injected (hexagonal: no direct DB config here)
func NewPostgresPORepository(pool *pgxpool.Pool) *PostgresPORepository {
return &PostgresPORepository{pool: pool}
}
// => Save upserts the PO — INSERT OR UPDATE based on primary key conflict
func (r *PostgresPORepository) Save(ctx context.Context, po *PurchaseOrder) error {
// => JSONB column stores line_items — avoids a separate table for this read pattern
lineItemsJSON, err := json.Marshal(po.LineItems)
if err != nil {
return fmt.Errorf("marshal line items: %w", err)
}
// => ON CONFLICT (id) DO UPDATE makes Save idempotent — safe to retry
_, err = r.pool.Exec(ctx, `
INSERT INTO purchase_orders (id, supplier_id, status, line_items, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
line_items = EXCLUDED.line_items,
updated_at = EXCLUDED.updated_at
`, po.Id, po.SupplierId, po.Status.String(), lineItemsJSON, po.CreatedAt, po.UpdatedAt)
return err
}
// => FindById maps DB row back to domain PurchaseOrder
func (r *PostgresPORepository) FindById(ctx context.Context, id PurchaseOrderId) (*PurchaseOrder, error) {
row := r.pool.QueryRow(ctx, `
SELECT id, supplier_id, status, line_items, created_at, updated_at
FROM purchase_orders WHERE id = $1
`, id)
var po PurchaseOrder
var statusStr string
var lineItemsJSON []byte
// => Scan into local variables; domain struct populated after scan
if err := row.Scan(&po.Id, &po.SupplierId, &statusStr, &lineItemsJSON, &po.CreatedAt, &po.UpdatedAt); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil // => nil, nil = not found; caller decides if missing is an error
}
return nil, fmt.Errorf("scan: %w", err)
}
po.Status = POStatusFromString(statusStr) // => string → enum; unknown string → Draft (safe default)
if err := json.Unmarshal(lineItemsJSON, &po.LineItems); err != nil {
return nil, fmt.Errorf("unmarshal line items: %w", err)
}
return &po, nil
}Guide 8 shows how to use the in-memory repository as a test double alongside this Postgres adapter, enabling the entire application service layer to be tested at unit-test speed.
Guide 8 — In-Memory Repository as a Hexagonal Test Double
The hexagonal architecture's payoff is that the entire application service layer can be tested against an in-memory adapter at unit-test speed — no Docker, no migrations, no ports. This guide implements and uses InMemoryPORepository as a drop-in test double for PostgresPORepository. Both adapters satisfy the same PurchaseOrderRepository port; the composition root decides which one to wire at startup. The test uses the in-memory adapter directly without any mocking framework.
Why It Matters
A test suite that requires a running database is fragile and slow. The in-memory adapter enables fast feedback in CI without mocking application service internals — the application service under test runs exactly as it would in production, just with a different adapter behind the port.
// => InMemoryPORepository satisfies the same PurchaseOrderRepository port as PostgresPORepository
type InMemoryPORepository struct {
mu sync.RWMutex
store map[PurchaseOrderId]*PurchaseOrder
}
func NewInMemoryPORepository() *InMemoryPORepository {
return &InMemoryPORepository{store: make(map[PurchaseOrderId]*PurchaseOrder)}
}
func (r *InMemoryPORepository) Save(_ context.Context, po *PurchaseOrder) error {
r.mu.Lock()
defer r.mu.Unlock()
// => Deep copy on save: prevents external mutation of stored state
cloned := *po
cloned.LineItems = append([]LineItem(nil), po.LineItems...)
r.store[po.Id] = &cloned
return nil
}
func (r *InMemoryPORepository) FindById(_ context.Context, id PurchaseOrderId) (*PurchaseOrder, error) {
r.mu.RLock()
defer r.mu.RUnlock()
po, ok := r.store[id]
if !ok {
return nil, nil // => nil, nil = not found; consistent with PostgresPORepository behaviour
}
// => Return a copy; caller cannot mutate stored state through the returned pointer
cloned := *po
return &cloned, nil
}
// => Unit test using InMemoryPORepository — no database required
func TestCreatePOHandler_Success(t *testing.T) {
repo := NewInMemoryPORepository()
bus := NewInMemoryEventBus()
handler := NewCreatePOHandler(repo, bus)
id, err := handler.Handle(context.Background(), CreatePOCommand{
SupplierID: "SUP-001",
LineItems: []CreateLineItemInput{
{Description: "Paper A4", Qty: 100, Unit: "each", UnitPriceCents: 500, Currency: "USD"},
},
})
// => After Handle, PO should be findable in the repo
require.NoError(t, err)
po, err := repo.FindById(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, POStatusDraft, po.Status)
assert.Len(t, po.LineItems, 1)
}Guide 9 shows how to publish domain events reliably using the outbox pattern, ensuring events are never lost even if the application crashes after saving the aggregate.
Guide 9 — Domain Event Publisher with Outbox Pattern
Domain events emitted by aggregates must be published reliably — if the application crashes after saving the aggregate but before publishing, events are lost. The outbox pattern stores events in the same DB transaction as the aggregate, then a separate publisher polls and forwards them. Go uses pgx transactions explicitly; Rust uses a sqlx Transaction handle passed through the same async task. Both implementations follow the same structural pattern: one atomic write, one background poller.
Why It Matters
Lost events cause silent data inconsistency across bounded contexts — Finance never knows a PO was approved; supplier notification is never sent. The outbox pattern provides at-least-once delivery without distributed transactions, making it the production-grade default for domain event publishing.
// => OutboxEntry stored in the same DB transaction as the aggregate
type OutboxEntry struct {
ID string
AggregateId string
EventType string
Payload []byte // => JSON-serialized event
PublishedAt *time.Time // => nil = unpublished
RetryCount int
CreatedAt time.Time
}
// => OutboxRepository is a port; same pgx transaction passed to both repo saves
type OutboxRepository interface {
Save(ctx context.Context, tx pgx.Tx, entry OutboxEntry) error
FindUnpublished(ctx context.Context, limit int) ([]OutboxEntry, error)
MarkPublished(ctx context.Context, id string) error
IncrementRetry(ctx context.Context, id string) error
}
// => CreatePOWithOutbox saves PO + outbox entries in one DB transaction
func CreatePOWithOutbox(ctx context.Context, pool *pgxpool.Pool, poRepo *PostgresPORepository, outboxRepo PostgresOutboxRepository, po *PurchaseOrder) error {
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx) // => deferred rollback is no-op after commit
// => Save PO within transaction
if err := poRepo.SaveTx(ctx, tx, po); err != nil {
return err
}
// => Save each domain event as outbox entry within SAME transaction
for _, event := range po.DomainEvents() {
payload, _ := json.Marshal(event)
entry := OutboxEntry{
ID: uuid.New().String(),
AggregateId: string(po.Id),
EventType: event.EventType(),
Payload: payload,
CreatedAt: time.Now(),
}
if err := outboxRepo.Save(ctx, tx, entry); err != nil {
return err
}
}
po.ClearEvents()
return tx.Commit(ctx) // => atomic: PO + outbox entries committed together
}
// => OutboxPublisher polls unpublished entries and forwards to event bus
type OutboxPublisher struct {
outboxRepo OutboxRepository
bus DomainEventBus
eventReg EventRegistry // => maps event type string → concrete event struct
interval time.Duration
}
func (p *OutboxPublisher) Run(ctx context.Context) {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.publishBatch(ctx)
case <-ctx.Done():
return
}
}
}
func (p *OutboxPublisher) publishBatch(ctx context.Context) {
entries, _ := p.outboxRepo.FindUnpublished(ctx, 100) // => process up to 100 at a time
for _, entry := range entries {
event := p.eventReg.Deserialize(entry.EventType, entry.Payload)
if err := p.bus.Publish(ctx, event); err != nil {
p.outboxRepo.IncrementRetry(ctx, entry.ID) // => failed entries retried next poll
continue
}
p.outboxRepo.MarkPublished(ctx, entry.ID)
}
}Guide 10 shows how to translate external supplier data through an Anti-Corruption Layer before it enters the domain model, keeping ERP naming conventions out of the bounded context.
Guide 10 — Cross-Context Anti-Corruption Layer (ACL)
The procurement context consumes supplier data from an ERP system that uses a different model — "Vendor" with a "VendorCode" instead of "Supplier" with a "SupplierCode". The Anti-Corruption Layer (ACL) translates at the boundary, keeping the ERP model out of the procurement domain. The ACL is implemented as a separate package (adapter/out/erpacl/) that depends on an ExternalVendorClient port — the domain never imports the HTTP client or ERP DTOs directly. Translation happens at the edge; domain structs always carry procurement vocabulary.
Why It Matters
Without an ACL, ERP naming conventions and data shapes leak into the domain model. A refactor in the ERP triggers domain code changes — tight coupling that makes the bounded context meaningless. The ACL acts as a semantic firewall: ERP changes are contained to the translator, not scattered across the domain.
// => ExternalVendorDTO represents the ERP's vendor model — not the domain's Supplier
type ExternalVendorDTO struct {
VendorCode string // => ERP uses "VendorCode" not "SupplierCode"
LegalName string
PaymentTermsDays int
BankIBAN string
ActiveFlag int // => ERP uses 1/0 instead of bool
}
// => ExternalVendorClient is a port — the ACL depends on this interface, not on the HTTP client directly
type ExternalVendorClient interface {
FetchVendor(ctx context.Context, vendorCode string) (ExternalVendorDTO, error)
}
// => SupplierACL translates ERP vendor data to domain Supplier
type SupplierACL struct {
client ExternalVendorClient // => injected port
}
func NewSupplierACL(client ExternalVendorClient) *SupplierACL {
return &SupplierACL{client: client}
}
// => FetchSupplierByCode fetches from ERP and translates to domain Supplier
func (acl *SupplierACL) FetchSupplierByCode(ctx context.Context, code SupplierCode) (*Supplier, error) {
// => Call ERP using its VendorCode — same format as SupplierCode in domain
dto, err := acl.client.FetchVendor(ctx, string(code))
if err != nil {
return nil, fmt.Errorf("ERP vendor fetch: %w", err)
}
return acl.translate(dto)
}
// => translate maps ERP DTO to domain Supplier — this is the ACL's core responsibility
func (acl *SupplierACL) translate(dto ExternalVendorDTO) (*Supplier, error) {
code, err := ParseSupplierCode(dto.VendorCode)
// => Validation at translation time: invalid ERP data never enters domain
if err != nil {
return nil, fmt.Errorf("invalid vendor code %q: %w", dto.VendorCode, err)
}
return &Supplier{
Id: SupplierId(uuid.New().String()), // => domain generates its own ID
Name: dto.LegalName,
Code: code,
Active: dto.ActiveFlag == 1, // => int → bool translation
CreatedAt: time.Now(),
}, nil
}Guides 11–14 cover database migrations, integration test harness, structured logging, and production wiring that ties all adapters together under environment-based configuration.
Guide 11 — Database Migrations
The Postgres schema for purchase_orders and outbox tables must be created and versioned before any adapter can run. Go uses goose v3; Rust uses sqlx's built-in sqlx::migrate! macro pointing at a migrations/ folder. Both tools track applied migrations in a version table and apply only the delta on startup. Migrations run once at application boot before the composition root constructs repositories — this guarantees the schema is always at the expected version when adapters start.
Why It Matters
Migrations without version control cause schema drift between environments — staging runs schema version 5 but production is at version 3, and the deployment fails silently with cryptic column-not-found errors. Goose and sqlx-migrate keep all environments synchronized by treating migrations as checked-in artifacts, not manual scripts.
// => migrations/001_create_purchase_orders.sql (goose format)
// -- +goose Up
// => Up migration: create the purchase_orders table
// CREATE TABLE purchase_orders (
// id TEXT PRIMARY KEY,
// supplier_id TEXT NOT NULL,
// status TEXT NOT NULL DEFAULT 'draft',
// line_items JSONB NOT NULL DEFAULT '[]',
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
// );
// -- +goose Down
// => Down migration: rollback drops the table
// DROP TABLE IF EXISTS purchase_orders;
// => migrations/002_create_outbox.sql
// -- +goose Up
// CREATE TABLE outbox (
// id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
// aggregate_id TEXT NOT NULL,
// event_type TEXT NOT NULL,
// payload JSONB NOT NULL,
// published_at TIMESTAMPTZ,
// retry_count INT NOT NULL DEFAULT 0,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
// );
// => partial index: only unpublished entries; keeps the poller query fast
// CREATE INDEX idx_outbox_unpublished ON outbox (created_at) WHERE published_at IS NULL;
// -- +goose Down
// DROP TABLE IF EXISTS outbox;
// => RunMigrations applies all pending migrations at startup
func RunMigrations(ctx context.Context, db *sql.DB, migrationsDir string) error {
// => goose.SetDialect must be called once; "postgres" maps to pgx-compatible SQL
goose.SetDialect("postgres")
// => goose.Up applies all unapplied migrations in ascending version order
if err := goose.Up(db, migrationsDir); err != nil {
return fmt.Errorf("run migrations: %w", err)
}
return nil
}
// => In main.go: call RunMigrations before constructing repositories
// => db, _ := sql.Open("pgx", cfg.DatabaseURL)
// => require.NoError(t, RunMigrations(ctx, db, "../../migrations"))Guide 12 builds on the migration setup to create a full integration test harness using Testcontainers, running real Postgres in Docker for each test run.
Guide 12 — Integration Test Harness with Testcontainers
Integration tests need a real Postgres instance. Testcontainers spins up a Docker Postgres container per test run — isolated, reproducible, and automatically cleaned up after the test. Go uses testcontainers-go; Rust uses testcontainers-rs. Both start the container, wait for the port, run migrations, then return a live connection pool. Cleanup is registered with the test runner so no manual teardown is needed.
Why It Matters
Testing against a mock database validates application logic, not database behavior. Real Postgres integration tests catch constraint violations, index performance issues, and JSONB query bugs that mock tests miss entirely — including the ON CONFLICT upsert behavior from Guide 7.
// => setupTestDB creates a Postgres container and returns a pgxpool for integration tests
func setupTestDB(t *testing.T) *pgxpool.Pool {
t.Helper()
ctx := context.Background()
// => testcontainers starts a real Postgres 16 container
req := testcontainers.ContainerRequest{
Image: "postgres:16-alpine",
ExposedPorts: []string{"5432/tcp"},
Env: map[string]string{
"POSTGRES_USER": "test",
"POSTGRES_PASSWORD": "test",
"POSTGRES_DB": "testdb",
},
WaitingFor: wait.ForListeningPort("5432/tcp"),
// => WaitingFor blocks until Postgres is ready to accept connections
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err)
// => t.Cleanup ensures container is stopped after test regardless of pass/fail
t.Cleanup(func() { container.Terminate(ctx) })
host, _ := container.Host(ctx)
port, _ := container.MappedPort(ctx, "5432")
dsn := fmt.Sprintf("postgres://test:test@%s:%s/testdb", host, port.Port())
pool, err := pgxpool.New(ctx, dsn)
require.NoError(t, err)
// => Run migrations against the test container before any test assertions
db, _ := sql.Open("pgx", dsn)
require.NoError(t, RunMigrations(ctx, db, "../../migrations"))
return pool
}
// => Integration test using real Postgres via testcontainers
func TestPostgresPORepository_SaveAndFind(t *testing.T) {
pool := setupTestDB(t)
repo := NewPostgresPORepository(pool)
po := NewPurchaseOrder(PurchaseOrderId("po-test-001"), SupplierId("sup-001"))
po.AddLineItem(LineItem{
Id: "li-001",
Description: "Printer paper",
Qty: Quantity{Amount: 10, Unit: UnitEach},
UnitPrice: Money{AmountCents: 500, Currency: "USD"},
})
// => Save PO to real Postgres; exercises ON CONFLICT upsert path
require.NoError(t, repo.Save(context.Background(), po))
found, err := repo.FindById(context.Background(), po.Id)
require.NoError(t, err)
require.NotNil(t, found)
assert.Equal(t, po.Status, found.Status)
assert.Len(t, found.LineItems, 1)
// => line_items round-trips through JSONB without data loss
}Guide 13 adds structured logging as a port-and-adapter, giving application services observability without importing a concrete logger.
Guide 13 — Structured Logging Adapter
Production applications need structured logs — key-value pairs that log aggregation systems (Datadog, Loki, CloudWatch) can query and alert on. The logging adapter is a port: domain and application layers never import the concrete logger directly. Go uses the standard library slog (Go 1.21+) wrapped behind a thin Logger interface; Rust uses the tracing crate wrapped behind a Logger trait. Both expose the same With(fields) / Info / Error surface so application services remain independent of the log backend.
Why It Matters
Unstructured log strings like "PO 1234 approved by alice" cannot be queried or alerted on — they require fragile regex parsing. Structured logs with discrete fields power dashboards, on-call alerts, and distributed trace correlation without any post-processing.
// => Logger is the domain-level logging port — no slog import in domain/app packages
type Logger interface {
Info(msg string, args ...any)
Error(msg string, args ...any)
With(args ...any) Logger // => With returns a child logger pre-set with fields
}
// => SlogAdapter wraps Go 1.21 slog.Logger as the Logger port implementation
type SlogAdapter struct {
inner *slog.Logger
}
func NewSlogAdapter(inner *slog.Logger) *SlogAdapter {
return &SlogAdapter{inner: inner}
}
func (l *SlogAdapter) Info(msg string, args ...any) {
l.inner.Info(msg, args...) // => slog.Info is structured: alternating key-value pairs
}
func (l *SlogAdapter) Error(msg string, args ...any) {
l.inner.Error(msg, args...)
}
func (l *SlogAdapter) With(args ...any) Logger {
// => slog.Logger.With creates a child logger; returned as Logger port
return &SlogAdapter{inner: l.inner.With(args...)}
}
// => Usage in application service: enrich logger with request-scoped fields
func (h *CreatePOHandler) Handle(ctx context.Context, cmd CreatePOCommand) (PurchaseOrderId, error) {
log := h.logger.With("command", "CreatePO", "supplier_id", cmd.SupplierId)
// => every log line from here carries command + supplier_id automatically
log.Info("handling command")
id, err := h.createPO(ctx, cmd)
if err != nil {
log.Error("command failed", "error", err)
return "", err
}
log.Info("PO created", "po_id", id)
return id, nil
}Guide 14 ties all adapters together in the composition root, selecting concrete implementations based on the APP_ENV environment variable.
Guide 14 — Multi-Adapter Configuration (Test vs Production)
The composition root in main.go/main.rs wires concrete adapters together based on environment. APP_ENV=test uses in-memory adapters; APP_ENV=production uses Postgres and real external clients. Configuration is loaded from environment variables — no hardcoded values, no config files committed to source control. The Wire function is the single place that knows which adapter satisfies which port; application services and domain packages never see this decision.
Why It Matters
Hardcoding adapter choices in the composition root makes environment-specific testing impossible. Environment-based wiring enables the same binary to run with test doubles in CI and real adapters in production, without any code changes between environments.
// => AppConfig holds environment-specific configuration
type AppConfig struct {
Env string // => "test", "development", "production"
DatabaseURL string
ERPBaseURL string
LogLevel string
}
// => LoadConfig reads from environment variables; defaults to "development"
func LoadConfig() AppConfig {
return AppConfig{
Env: getEnv("APP_ENV", "development"),
DatabaseURL: getEnv("DATABASE_URL", ""),
ERPBaseURL: getEnv("ERP_BASE_URL", ""),
LogLevel: getEnv("LOG_LEVEL", "info"),
}
}
// => Wire builds the application with adapters chosen by config
func Wire(cfg AppConfig) (*Application, error) {
var repo PurchaseOrderRepository
var erpClient ExternalVendorClient
switch cfg.Env {
case "test":
// => test env: in-memory adapters, no external dependencies
repo = NewInMemoryPORepository()
erpClient = NewMockVendorClient()
default:
// => production/development: Postgres + real HTTP client
if cfg.DatabaseURL == "" {
return nil, fmt.Errorf("DATABASE_URL required for env %s", cfg.Env)
}
pool, err := pgxpool.New(context.Background(), cfg.DatabaseURL)
if err != nil {
return nil, err
}
// => LoggingPORepository wraps PostgresPORepository — decorator pattern
repo = NewLoggingPORepository(NewPostgresPORepository(pool), slog.Default())
erpClient = NewHTTPVendorClient(cfg.ERPBaseURL)
}
supplierACL := NewSupplierACL(erpClient)
bus := NewInMemoryEventBus()
createPOHandler := NewCreatePOHandler(repo, bus)
// => all ports wired; application struct holds only the handler surface
return &Application{createPOHandler: createPOHandler, supplierACL: supplierACL}, nil
}The advanced tier (Guides 15+) covers retry and circuit-breaker decorators, end-to-end domain event flow across four bounded contexts, OpenTelemetry observability, and the Kubernetes deployment topology.
Last updated May 23, 2026