Distributed Systems
How do you scale Elixir applications across multiple machines? This guide teaches distributed systems patterns from single-node BEAM through distributed Erlang clustering, showing production solutions with libcluster for automatic node discovery and Horde for distributed process management.
Why Distributed Systems Matter
Single-node applications have fundamental limitations:
- Scaling ceiling - Single machine CPU and memory limits
- Single point of failure - Hardware failure stops entire application
- Geographic distribution - Cannot place nodes close to users
- Load distribution - Cannot spread work across machines
- Hot upgrades - Difficult without redundancy
- Availability requirements - Cannot achieve high availability on single node
Production systems require distribution for scalability, fault tolerance, and high availability.
Financial Domain Examples
Examples use distributed donation processing scenarios:
- Distributed donation processing - Process donations across multiple nodes
- Global process registry - Track donation processors across cluster
- Partition tolerance - Handle network splits between donation centers
- Consistent hashing - Distribute donations predictably across nodes
These demonstrate distributed patterns with real financial operations.
Standard Library - Single Node BEAM
Process Communication on Single Node
BEAM provides excellent concurrency on single node.
# Single-node process communication
defmodule DonationProcessor do
def start_link do
pid = spawn_link(fn -> process_loop() end)
# => Process on local node
# => Type: pid()
{:ok, pid}
end
defp process_loop do
receive do
{:process, donation} ->
zakat = donation.amount * 0.025 # => Calculate 2.5% Zakat
# => Type: float()
IO.puts("Processed: $#{donation.amount}, Zakat: $#{zakat}")
# => Output to console
process_loop() # => Continue processing
end
end
end
# Usage - single node only
{:ok, pid} = DonationProcessor.start_link()
# => pid: Process identifier (local)
# => Only accessible on this node
send(pid, {:process, %{amount: 1000}})
# => Sends message to process
# => Output: Processed: $1000, Zakat: $25.0
# => Works only on same nodeProcess communication works perfectly within single node.
Named Processes with Registry
# Register process with name on single node
defmodule DonationRegistry do
def start_link do
Registry.start_link(keys: :unique, name: __MODULE__)
# => Local registry on this node
# => Type: {:ok, pid()}
end
def register_processor(donation_id) do
pid = spawn(fn ->
receive do
{:process, amount} ->
zakat = amount * 0.025 # => Calculate Zakat
IO.puts("Donation #{donation_id}: $#{amount}, Zakat: $#{zakat}")
# => Process donation
end
end)
Registry.register(__MODULE__, donation_id, pid)
# => Register with ID
# => Only visible on local node
{:ok, pid}
end
def lookup(donation_id) do
case Registry.lookup(__MODULE__, donation_id) do
[{pid, _}] -> {:ok, pid} # => Found on local node
[] -> {:error, :not_found} # => Not found
end
end
end
# Usage
{:ok, _} = DonationRegistry.start_link()
{:ok, pid} = DonationRegistry.register_processor("DON-001")
# => Registered on local node only
{:ok, found_pid} = DonationRegistry.lookup("DON-001")
# => found_pid: Process on local node
# => Works only on this nodeRegistry works perfectly for single node but doesn’t distribute.
Limitations of Single Node
Problem 1: No Distribution
# Cannot access processes on other nodes
# Node A:
{:ok, pid} = DonationProcessor.start_link()
Process.register(pid, :donation_processor) # => Registered on Node A only
# => Not visible to Node B
# Node B (different machine):
send(:donation_processor, {:process, %{amount: 1000}})
# => Error: :noproc (no process)
# => Cannot find process on Node A
# => No cross-node communicationProcesses registered on one machine invisible to others.
Problem 2: Single Point of Failure
# Single node crash stops all processing
# Only one node running DonationProcessor
{:ok, pid} = DonationProcessor.start_link()
# If this node crashes:
# - All donation processing stops
# - No automatic failover
# - Manual intervention required
# - Downtime until restartHardware failure means complete system failure.
Problem 3: Scaling Limits
# CPU and memory bounded by single machine
# Processing 10,000 donations:
Enum.each(1..10_000, fn i ->
DonationProcessor.process(%{amount: i * 100})
# => All on single machine
# => Limited by single CPU
# => Cannot scale horizontally
end)
# => Hits single machine limits
# => Cannot add more machines to helpCannot scale beyond single machine capacity.
Problem 4: No Geographic Distribution
# All processing on single datacenter
# Users in Asia, Europe, North America
# All requests go to single US datacenter
# - High latency for distant users
# - Cannot place nodes near users
# - No geographic redundancySingle location means poor global performance.
Distributed Erlang - Built-in Clustering
Connecting Nodes
BEAM includes distributed Erlang for node clustering.
# Start nodes with distributed names
# Terminal 1:
iex --sname node1 --cookie secret_token
# Terminal 2:
iex --sname node2 --cookie secret_token
# Connect nodes (in node2):
Node.connect(:"node1@hostname") # => Connects to node1
# => Returns: true
# => Forms cluster
Node.list() # => [:"node1@hostname"]
# => Shows connected nodes
# => Type: [atom()]
# Now nodes can communicate
Node.spawn(:"node1@hostname", fn ->
IO.puts("Running on node1") # => Executes on remote node
end)
# => Spawns process on node1
# => Cross-node process creationDistributed Erlang enables cross-node communication.
Global Process Registry
# :global registry spans cluster
defmodule DistributedProcessor do
def start_link(node_name) do
pid = spawn_link(fn -> process_loop(node_name) end)
# => Local process
# Register globally across cluster
:global.register_name(:donation_processor, pid)
# => Visible to all nodes
# => Type: :yes | :no
{:ok, pid}
end
defp process_loop(node_name) do
receive do
{:process, donation} ->
zakat = donation.amount * 0.025
IO.puts("[#{node_name}] Processed: $#{donation.amount}, Zakat: $#{zakat}")
# => Shows which node processed
process_loop(node_name)
end
end
def send_to_processor(donation) do
case :global.whereis_name(:donation_processor) do
:undefined ->
{:error, :not_found} # => No registered processor
pid ->
send(pid, {:process, donation}) # => Send to registered process
# => Works across nodes
:ok
end
end
end
# Node1:
{:ok, _pid} = DistributedProcessor.start_link("Node1")
# => Registered globally
# Node2 (different machine):
DistributedProcessor.send_to_processor(%{amount: 1000})
# => Finds process on Node1
# => Sends message across network
# => Output on Node1: [Node1] Processed: $1000, Zakat: $25.0:global registry provides cluster-wide process discovery.
pg Module for Process Groups
# pg (Process Groups) for distributed groups
defmodule DonationGroup do
def start_processor(region) do
pid = spawn(fn ->
receive do
{:process, donation} ->
zakat = donation.amount * 0.025
IO.puts("[#{region}] Processed: $#{donation.amount}")
# => Regional processing
end
end)
# Join process group
:pg.join(:donation_processors, region, pid)
# => Add to regional group
# => Distributed across cluster
{:ok, pid}
end
def broadcast_to_region(region, donation) do
members = :pg.get_members(:donation_processors, region)
# => Get all processes in region
# => Across all nodes
# => Type: [pid()]
Enum.each(members, fn pid ->
send(pid, {:process, donation}) # => Send to each processor
end)
{:ok, length(members)} # => Count of processors
end
end
# Node1:
{:ok, _} = DonationGroup.start_processor("US-East")
# => Processor joins US-East group
# Node2:
{:ok, _} = DonationGroup.start_processor("US-East")
# => Another processor in same region
# Node3:
DonationGroup.broadcast_to_region("US-East", %{amount: 1000})
# => Broadcasts to all US-East processors
# => Both Node1 and Node2 receive message
# => Type: {:ok, 2}pg module enables distributed process groups.
libcluster - Automatic Node Discovery
Problem with Manual Connection
# Manual Node.connect() has issues:
# - Must know node names in advance
# - Hardcoded hostnames
# - No automatic discovery
# - Manual reconnection on failure
# - Doesn't handle dynamic cloud environmentsProduction needs automatic node discovery.
libcluster Solution
# mix.exs
defp deps do
[
{:libcluster, "~> 3.3"} # => Automatic clustering library
]
end
# application.ex
defmodule Finance.Application do
use Application
def start(_type, _args) do
topologies = [
donation_cluster: [
strategy: Cluster.Strategy.Epmd, # => Strategy for node discovery
# => Epmd: Erlang Port Mapper Daemon
config: [
hosts: [
:"donation@node1.example.com", # => Known nodes
:"donation@node2.example.com",
:"donation@node3.example.com"
]
]
]
]
children = [
{Cluster.Supervisor, [topologies, [name: Finance.ClusterSupervisor]]},
# => Starts cluster supervisor
# => Automatically connects nodes
# Other children...
]
Supervisor.start_link(children, strategy: :one_for_one)
end
endlibcluster automatically discovers and connects nodes.
Kubernetes Strategy
# libcluster with Kubernetes for cloud environments
topologies = [
k8s_donation_cluster: [
strategy: Cluster.Strategy.Kubernetes, # => Kubernetes-aware strategy
config: [
mode: :dns, # => DNS-based discovery
kubernetes_node_basename: "donation", # => Service name prefix
kubernetes_selector: "app=donation-processor",
# => Label selector
kubernetes_namespace: "finance", # => Namespace
polling_interval: 10_000 # => Check every 10 seconds
]
]
]
# libcluster queries Kubernetes API
# - Discovers pods matching selector
# - Extracts pod IPs
# - Forms erlang node names
# - Automatically connects
# - Reconnects on pod changesKubernetes strategy handles dynamic cloud environments.
Gossip Strategy for DNS-less
# Gossip strategy for environments without DNS
topologies = [
gossip_cluster: [
strategy: Cluster.Strategy.Gossip, # => UDP multicast gossip
config: [
port: 45892, # => UDP port for gossip
multicast_addr: "230.1.1.251", # => Multicast address
multicast_ttl: 1, # => Time-to-live
secret: "secret_token" # => Shared secret for security
]
]
]
# Nodes broadcast presence via UDP multicast
# - No DNS required
# - No configuration of node names
# - Automatic peer discovery
# - Good for local developmentGossip strategy works without DNS infrastructure.
Horde - Distributed Registry and Supervisor
Problem with :global
# :global registry has limitations:
# 1. No supervision (processes not restarted)
# 2. No consistent hashing (uneven distribution)
# 3. Manual failover on node crash
# 4. Network partition causes split-brainProduction needs distributed supervision.
Horde.Registry - Distributed Registry
# mix.exs
defp deps do
[
{:horde, "~> 0.8"} # => Distributed process registry
]
end
# Start Horde.Registry in application
defmodule Finance.Application do
use Application
def start(_type, _args) do
children = [
{Cluster.Supervisor, [topologies, [name: Finance.ClusterSupervisor]]},
{Horde.Registry, [name: Finance.DonationRegistry, keys: :unique]},
# => Distributed registry
# => Replicated across cluster
# Other children...
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Register process in Horde
defmodule DonationWorker do
def start_link(donation_id) do
pid = spawn_link(fn -> worker_loop(donation_id) end)
# => Worker process
# Register in distributed registry
Horde.Registry.register(
Finance.DonationRegistry,
{:donation, donation_id}, # => Unique key
pid # => Process pid
)
# => Registered across entire cluster
# => Any node can lookup
{:ok, pid}
end
defp worker_loop(donation_id) do
receive do
{:process, amount} ->
zakat = amount * 0.025
IO.puts("Worker #{donation_id}: Processed $#{amount}")
# => Process donation
worker_loop(donation_id)
end
end
def send_to_worker(donation_id, amount) do
case Horde.Registry.lookup(Finance.DonationRegistry, {:donation, donation_id}) do
[{pid, _}] ->
send(pid, {:process, amount}) # => Send to worker
# => Works across nodes
:ok
[] ->
{:error, :not_found} # => Worker not registered
end
end
end
# Node1:
{:ok, _} = DonationWorker.start_link("DON-001")
# => Registered in Horde
# Node2:
DonationWorker.send_to_worker("DON-001", 1000)
# => Finds worker on Node1
# => Sends message across clusterHorde.Registry provides cluster-wide process registry with CRDT consistency.
Horde.DynamicSupervisor - Distributed Supervision
# Start Horde.DynamicSupervisor
defmodule Finance.Application do
use Application
def start(_type, _args) do
children = [
{Cluster.Supervisor, [topologies, [name: Finance.ClusterSupervisor]]},
{Horde.Registry, [name: Finance.DonationRegistry, keys: :unique]},
{Horde.DynamicSupervisor,
[
name: Finance.DonationSupervisor, # => Distributed supervisor
strategy: :one_for_one, # => Restart strategy
members: :auto # => Auto-discover cluster members
]},
# Other children...
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Start supervised worker
defmodule DonationSupervisedWorker do
use GenServer
def start_link(donation_id) do
GenServer.start_link(__MODULE__, donation_id,
name: via_tuple(donation_id)) # => Named via Horde.Registry
end
defp via_tuple(donation_id) do
{:via, Horde.Registry, {Finance.DonationRegistry, {:donation, donation_id}}}
# => Registry via tuple
end
def init(donation_id) do
{:ok, %{donation_id: donation_id, processed: 0}}
# => Initial state
end
def handle_call({:process, amount}, _from, state) do
zakat = amount * 0.025
new_state = %{state | processed: state.processed + 1}
# => Update processed count
{:reply, {:ok, zakat}, new_state}
end
end
# Start worker under Horde supervisor
defmodule DonationService do
def start_worker(donation_id) do
child_spec = %{
id: donation_id,
start: {DonationSupervisedWorker, :start_link, [donation_id]},
restart: :transient # => Restart on failure
}
Horde.DynamicSupervisor.start_child(
Finance.DonationSupervisor,
child_spec
)
# => Worker started on some node
# => Supervised by distributed supervisor
# => Automatically restarted on failure
end
def process_donation(donation_id, amount) do
GenServer.call(
{:via, Horde.Registry, {Finance.DonationRegistry, {:donation, donation_id}}},
{:process, amount}
)
# => Calls worker wherever it is
# => Transparent cross-node communication
end
end
# Usage - any node:
{:ok, _pid} = DonationService.start_worker("DON-001")
# => Started on some node in cluster
# Different node:
{:ok, zakat} = DonationService.process_donation("DON-001", 1000)
# => zakat: 25.0
# => Called across cluster transparentlyHorde.DynamicSupervisor distributes and supervises processes across cluster.
Partition Tolerance - CAP Theorem
Network Partition Scenarios
# Cluster splits into two partitions:
# Partition 1: [Node1, Node2]
# Partition 2: [Node3, Node4]
# Without partition handling:
# - Each partition thinks it's the full cluster
# - Duplicate processes may start
# - Split-brain scenario
# - Data inconsistencyProduction must handle network partitions.
Horde with CRDT Consistency
# Horde uses CRDTs (Conflict-free Replicated Data Types)
# - Eventually consistent
# - Handles network partitions
# - Automatic reconciliation on heal
# - No split-brain
# When partition heals:
# - Horde detects partition
# - Syncs registry state
# - Removes duplicate processes
# - Converges to consistent state
# - No manual interventionHorde’s CRDT approach provides partition tolerance.
Choosing Consistency Strategy
# CAP Theorem: Choose 2 of 3
# - Consistency: All nodes see same data
# - Availability: System responds to requests
# - Partition tolerance: Works despite network split
# :global registry:
# - CP (Consistency + Partition tolerance)
# - Blocks on partition
# - Guarantees single registration
# - May be unavailable during split
# Horde:
# - AP (Availability + Partition tolerance)
# - Continues operating on partition
# - Eventually consistent
# - May have temporary duplicates
# - Converges on healChoose strategy based on requirements.
Production Deployment Pattern
Complete Distributed System
# Complete production setup
defmodule Finance.Application do
use Application
def start(_type, _args) do
# Configure libcluster
topologies = [
k8s_cluster: [
strategy: Cluster.Strategy.Kubernetes.DNS,
config: [
service: "donation-service", # => Kubernetes service name
application_name: "finance", # => Application name
kubernetes_namespace: "production"
]
]
]
children = [
# 1. Cluster formation
{Cluster.Supervisor, [topologies, [name: Finance.ClusterSupervisor]]},
# 2. Distributed registry
{Horde.Registry, [name: Finance.Registry, keys: :unique]},
# 3. Distributed supervisor
{Horde.DynamicSupervisor,
[
name: Finance.Supervisor,
strategy: :one_for_one,
members: :auto,
distribution_strategy: Horde.UniformQuorumDistribution
# => Balanced distribution
]},
# 4. Application workers
Finance.DonationRouter, # => Routes requests to workers
# Other children...
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Donation router for load balancing
defmodule Finance.DonationRouter do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
{:ok, %{}}
end
def process_donation(donation) do
# Start worker if needed
worker_id = "donation_#{donation.id}"
ensure_worker_started(worker_id)
# Process via distributed registry
GenServer.call(
{:via, Horde.Registry, {Finance.Registry, worker_id}},
{:process, donation}
)
end
defp ensure_worker_started(worker_id) do
case Horde.Registry.lookup(Finance.Registry, worker_id) do
[] ->
# Start new worker
child_spec = %{
id: worker_id,
start: {DonationSupervisedWorker, :start_link, [worker_id]}
}
Horde.DynamicSupervisor.start_child(Finance.Supervisor, child_spec)
[{_pid, _}] ->
# Worker already exists
:ok
end
end
end
# Usage - transparent distribution:
donation = %{id: "DON-123", amount: 1000}
{:ok, result} = Finance.DonationRouter.process_donation(donation)
# => Worker started on some node
# => Balanced across cluster
# => Supervised for fault tolerance
# => Transparent to callerComplete distributed system with clustering, registry, and supervision.
Decision Matrix
| Approach | Clustering | Registry | Supervision | Use Case |
|---|---|---|---|---|
| Single Node | ❌ None | Registry (local) | ✅ Local | Development, small apps |
| Distributed Erlang | ✅ Manual | :global | ❌ Manual | Learning, simple clustering |
| libcluster + :global | ✅ Auto | :global | ❌ Manual | Basic auto-clustering |
| libcluster + Horde | ✅ Auto | Horde.Registry | ✅ Horde | Production distributed apps |
Decision Guide
Use Single Node When:
- Development environment
- Low traffic (<10k requests/day)
- No HA requirements
- Single datacenter deployment
Use Distributed Erlang When:
- Learning distribution fundamentals
- Simple clustering needs
- Manual control required
Use libcluster + :global When:
- Need automatic clustering
- Simple process registry sufficient
- Manual supervision acceptable
Use libcluster + Horde When:
- Production systems
- High availability required
- Need distributed supervision
- Partition tolerance critical
- Automatic failover needed
Best Practices
1. Start Single Node, Add Distribution Later
# Good: Start simple
# 1. Build on single node first
# 2. Test thoroughly
# 3. Add distribution when needed
# 4. Distribution adds complexity
# Avoid: Starting distributed
# - Adds unnecessary complexity early
# - Harder to debug
# - May not need distributionAdd distribution only when single node insufficient.
2. Use libcluster for Clustering
# Good: libcluster handles connection
topologies = [k8s_cluster: [strategy: Cluster.Strategy.Kubernetes.DNS, ...]]
# Avoid: Manual Node.connect()
Node.connect(:"node1@host") # => Manual, error-proneAlways use libcluster in production.
3. Prefer Horde over :global
# Good: Horde for production
Horde.Registry.register(Finance.Registry, key, pid)
# Avoid: :global for production
:global.register_name(key, pid) # => No partition toleranceHorde provides better partition tolerance.
4. Monitor Cluster Health
# Monitor cluster state
defmodule ClusterMonitor do
use GenServer
def init(_) do
:net_kernel.monitor_nodes(true) # => Enable node monitoring
{:ok, %{}}
end
def handle_info({:nodeup, node}, state) do
Logger.info("Node joined: #{node}") # => Log joins
{:noreply, state}
end
def handle_info({:nodedown, node}, state) do
Logger.warn("Node left: #{node}") # => Log departures
# Trigger alerts, cleanup, etc.
{:noreply, state}
end
endMonitor cluster health for operations.
5. Test Partition Scenarios
# Test network partition handling
# Use :partisan for partition simulation
# Test split-brain scenarios
# Verify state convergence on healTest partition handling before production.
Common Pitfalls
Pitfall 1: Starting Distributed Too Early
# Wrong: Distribution from day one
# - Added complexity
# - Harder debugging
# - May not need it
# Right: Add when needed
# - Start single node
# - Add distribution at scalePitfall 2: Using :global in Production
# Wrong: :global without partition handling
:global.register_name(:worker, pid) # => Vulnerable to split-brain
# Right: Horde with CRDT
Horde.Registry.register(Registry, :worker, pid)
# => Partition tolerantPitfall 3: Ignoring Network Partitions
# Wrong: Assuming network always works
# - Partitions happen in production
# - Must handle split-brain
# - Test partition scenarios
# Right: Plan for partitions
# - Use partition-tolerant tools
# - Monitor cluster health
# - Test failure modesPitfall 4: No Cluster Monitoring
# Wrong: No visibility into cluster state
# - Cannot detect issues
# - No alerting on node failure
# Right: Monitor actively
# - Log node joins/leaves
# - Alert on unexpected changes
# - Track process distributionFurther Reading
Related distributed topics:
- GenServer Patterns - State management across nodes
- Supervisor Trees - Supervision strategies
Production patterns:
- Deployment Strategies - Deploying distributed apps
- Best Practices - Production OTP patterns
Summary
Distributed systems in Elixir follow clear progression:
- Single Node - BEAM processes with local Registry
- Limitations - No distribution, single point of failure, scaling ceiling
- Distributed Erlang - Node clustering with :global registry
- Production - libcluster for auto-clustering + Horde for distributed registry/supervisor
Use libcluster for automatic node discovery and clustering in production environments.
Use Horde for distributed process registry and supervision with partition tolerance.
Consider CAP theorem when choosing consistency vs. availability trade-offs.
Key insight: Start single-node, add distribution when scaling demands it. Distribution adds complexity but enables horizontal scaling and high availability.