Processes Message Passing
Need to handle concurrent operations? This guide teaches Elixir’s lightweight processes, message passing, spawn patterns, process linking, and building concurrent systems with the actor model.
Problem
Traditional threading is complex and error-prone:
- Shared mutable state causes race conditions
- Locks lead to deadlocks
- Thread pools are resource-intensive
- Difficult to reason about concurrent code
Elixir processes are lightweight (thousands per core), isolated (no shared memory), and communicate via messages.
Prerequisites
Solution Overview
Elixir process model:
- Spawning: Create new processes
- Message Passing:
sendandreceive - Process Linking: Monitor process health
- State Management: Recursive loops
Creating Processes
Basic Spawn
defmodule ProcessExamples do
# Spawn anonymous function
def spawn_basic do
pid = spawn(fn ->
IO.puts("Hello from process #{inspect(self())}")
end)
IO.puts("Spawned process: #{inspect(pid)}")
pid
end
# Spawn named function
def spawn_named do
spawn(__MODULE__, :worker_function, ["argument"])
end
def worker_function(arg) do
IO.puts("Worker received: #{arg}")
end
# Spawn and wait
def spawn_and_wait do
parent = self()
child = spawn(fn ->
:timer.sleep(1000)
send(parent, {:done, "result"})
end)
receive do
{:done, result} ->
IO.puts("Received: #{result}")
after
2000 ->
IO.puts("Timeout waiting for child")
end
child
end
end
ProcessExamples.spawn_basic()
ProcessExamples.spawn_and_wait()How It Works: spawn/1 creates new process executing given function. Returns PID (process identifier). Process runs concurrently with parent.
Spawn vs Spawn Link
defmodule SpawnComparison do
# Regular spawn - processes independent
def spawn_independent do
spawn(fn ->
:timer.sleep(100)
raise "Crash!"
end)
:timer.sleep(200)
IO.puts("Parent still alive")
end
# Spawn link - crash propagates
def spawn_linked do
spawn_link(fn ->
:timer.sleep(100)
raise "Crash!"
end)
:timer.sleep(200)
IO.puts("Parent still alive (you won't see this)")
end
# Trap exits to handle crashes
def spawn_link_trapped do
Process.flag(:trap_exit, true)
pid = spawn_link(fn ->
:timer.sleep(100)
raise "Crash!"
end)
receive do
{:EXIT, ^pid, reason} ->
IO.puts("Child crashed: #{inspect(reason)}")
end
IO.puts("Parent handled crash and continues")
end
end
SpawnComparison.spawn_independent()
SpawnComparison.spawn_link_trapped()Message Passing
Send and Receive
defmodule Messenger do
def ping_pong do
parent = self()
# Spawn child process
child = spawn(fn ->
receive do
{:ping, from} ->
IO.puts("Child: received ping")
send(from, {:pong, self()})
end
end)
# Send message to child
send(child, {:ping, parent})
# Wait for response
receive do
{:pong, from} ->
IO.puts("Parent: received pong from #{inspect(from)}")
end
end
# Multiple message patterns
def message_patterns do
receive do
{:hello, name} ->
IO.puts("Hello, #{name}!")
{:goodbye, name} ->
IO.puts("Goodbye, #{name}!")
:shutdown ->
IO.puts("Shutting down")
:shutdown
other ->
IO.puts("Unknown message: #{inspect(other)}")
after
5000 ->
IO.puts("No message received in 5 seconds")
end
end
end
Messenger.ping_pong()
pid = spawn(Messenger, :message_patterns, [])
send(pid, {:hello, "Alice"})Mailbox and Message Queue
defmodule MailboxExamples do
# Messages queue up in mailbox
def queue_messages do
pid = spawn(fn ->
:timer.sleep(2000) # Delay before processing
receive do
msg1 -> IO.puts("Received 1: #{inspect(msg1)}")
end
receive do
msg2 -> IO.puts("Received 2: #{inspect(msg2)}")
end
receive do
msg3 -> IO.puts("Received 3: #{inspect(msg3)}")
end
end)
# Send messages immediately
send(pid, "First")
send(pid, "Second")
send(pid, "Third")
:timer.sleep(3000)
pid
end
# Selective receive
def selective_receive do
spawn(fn ->
send(self(), {:low, "unimportant"})
send(self(), {:high, "urgent"})
send(self(), {:low, "also unimportant"})
# Process high priority first
receive do
{:high, msg} -> IO.puts("Priority: #{msg}")
end
# Then process remaining
receive do
{:low, msg} -> IO.puts("Normal: #{msg}")
end
receive do
{:low, msg} -> IO.puts("Normal: #{msg}")
end
end)
end
endStateful Processes
Recursive Loop Pattern
defmodule Counter do
# Server loop
def start(initial_value \\ 0) do
spawn(fn -> loop(initial_value) end)
end
defp loop(current_value) do
receive do
{:increment, from} ->
new_value = current_value + 1
send(from, {:value, new_value})
loop(new_value)
{:get, from} ->
send(from, {:value, current_value})
loop(current_value)
:stop ->
IO.puts("Counter stopping with value: #{current_value}")
# Don't recurse - process terminates
end
end
# Client functions
def increment(pid) do
send(pid, {:increment, self()})
receive do
{:value, value} -> value
after
1000 -> {:error, :timeout}
end
end
def get(pid) do
send(pid, {:get, self()})
receive do
{:value, value} -> value
after
1000 -> {:error, :timeout}
end
end
def stop(pid) do
send(pid, :stop)
end
end
counter = Counter.start(0)
Counter.increment(counter) # => 1
Counter.increment(counter) # => 2
Counter.get(counter) # => 2
Counter.increment(counter) # => 3
Counter.stop(counter)Key-Value Store
defmodule KVStore do
def start do
spawn(fn -> loop(%{}) end)
end
defp loop(state) do
receive do
{:put, key, value, from} ->
new_state = Map.put(state, key, value)
send(from, :ok)
loop(new_state)
{:get, key, from} ->
value = Map.get(state, key)
send(from, {:ok, value})
loop(state)
{:delete, key, from} ->
new_state = Map.delete(state, key)
send(from, :ok)
loop(new_state)
{:keys, from} ->
keys = Map.keys(state)
send(from, {:ok, keys})
loop(state)
:stop ->
:ok # Terminate
end
end
# Client API
def put(pid, key, value) do
send(pid, {:put, key, value, self()})
receive do
:ok -> :ok
after
1000 -> {:error, :timeout}
end
end
def get(pid, key) do
send(pid, {:get, key, self()})
receive do
{:ok, value} -> {:ok, value}
after
1000 -> {:error, :timeout}
end
end
def delete(pid, key) do
send(pid, {:delete, key, self()})
receive do
:ok -> :ok
after
1000 -> {:error, :timeout}
end
end
def keys(pid) do
send(pid, {:keys, self()})
receive do
{:ok, keys} -> {:ok, keys}
after
1000 -> {:error, :timeout}
end
end
def stop(pid) do
send(pid, :stop)
end
end
store = KVStore.start()
KVStore.put(store, :name, "Alice")
KVStore.put(store, :age, 30)
KVStore.get(store, :name) # => {:ok, "Alice"}
KVStore.keys(store) # => {:ok, [:name, :age]}
KVStore.delete(store, :age)
KVStore.keys(store) # => {:ok, [:name]}Process Monitoring
Monitor vs Link
defmodule ProcessMonitoring do
# Monitoring (one-way)
def monitor_example do
pid = spawn(fn ->
:timer.sleep(1000)
raise "Crash!"
end)
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process #{inspect(pid)} crashed: #{inspect(reason)}")
end
IO.puts("Monitor continues running")
end
# Check if process is alive
def check_alive do
pid = spawn(fn -> :timer.sleep(5000) end)
IO.puts("Process alive? #{Process.alive?(pid)}")
:timer.sleep(6000)
IO.puts("Process alive? #{Process.alive?(pid)}")
end
# Named processes
def named_process do
# Register process with name
pid = spawn(fn ->
receive do
msg -> IO.puts("Named process received: #{inspect(msg)}")
end
end)
Process.register(pid, :my_worker)
# Send to named process
send(:my_worker, "Hello!")
# Find PID by name
Process.whereis(:my_worker)
end
end
ProcessMonitoring.monitor_example()Concurrent Patterns
Parallel Map
defmodule ParallelProcessing do
def parallel_map(collection, fun) do
parent = self()
# Spawn process for each item
processes = Enum.map(collection, fn item ->
spawn(fn ->
result = fun.(item)
send(parent, {self(), result})
end)
end)
# Collect results
Enum.map(processes, fn pid ->
receive do
{^pid, result} -> result
end
end)
end
# Example usage
def expensive_computation(n) do
:timer.sleep(1000) # Simulate work
n * n
end
end
Enum.map([1, 2, 3, 4, 5], &ParallelProcessing.expensive_computation/1)
ParallelProcessing.parallel_map([1, 2, 3, 4, 5],
&ParallelProcessing.expensive_computation/1)Worker Pool
defmodule WorkerPool do
def start(num_workers) do
parent = self()
workers = Enum.map(1..num_workers, fn id ->
spawn(fn -> worker_loop(id, parent) end)
end)
coordinator(workers, [])
end
defp worker_loop(id, coordinator) do
send(coordinator, {:ready, self()})
receive do
{:work, work_id, fun} ->
IO.puts("Worker #{id} processing work #{work_id}")
result = fun.()
send(coordinator, {:done, self(), work_id, result})
worker_loop(id, coordinator)
:stop ->
IO.puts("Worker #{id} stopping")
:ok
end
end
defp coordinator(workers, ready_workers) do
receive do
{:ready, pid} ->
coordinator(workers, [pid | ready_workers])
{:schedule, work_id, fun, from} ->
case ready_workers do
[worker | rest] ->
send(worker, {:work, work_id, fun})
send(from, {:scheduled, work_id})
coordinator(workers, rest)
[] ->
send(from, {:error, :no_workers_available})
coordinator(workers, ready_workers)
end
{:done, pid, work_id, result} ->
IO.puts("Work #{work_id} completed: #{inspect(result)}")
coordinator(workers, [pid | ready_workers])
:stop ->
Enum.each(workers, fn pid -> send(pid, :stop) end)
:ok
end
end
endBest Practices
Do: Keep Processes Small and Focused
defmodule Cache do
def start, do: spawn(fn -> loop(%{}) end)
defp loop(state), do: # ... cache operations
end
defmodule Logger do
def start, do: spawn(fn -> loop([]) end)
defp loop(logs), do: # ... logging operations
endDo: Use Timeout on Receive
receive do
msg -> handle(msg)
after
5000 -> {:error, :timeout}
end
receive do
msg -> handle(msg)
endDo: Match Specific Messages
receive do
{:ok, value} -> handle_success(value)
{:error, reason} -> handle_error(reason)
:shutdown -> :ok
after
1000 -> :timeout
end
receive do
msg -> handle(msg)
endSee Also
Last updated