Cookbook

Need quick solutions to common Elixir problems? This cookbook provides 35+ tested recipes covering data structures, concurrency, OTP patterns, Phoenix development, Ecto queries, testing strategies, and performance optimization.

How to Use This Cookbook

Each recipe follows a consistent format:

  1. Problem: What challenge you’re solving
  2. Solution: Complete working code
  3. How It Works: Explanation of the solution
  4. Use Cases: When to apply this pattern
  5. See Also: Related tutorials and guides

All code examples are tested with Elixir 1.14+ and OTP 25+.

Categories


Data Structures

Recipe 1: Transform Map Keys (Atoms to Strings)

Problem: Convert map with atom keys to string keys for JSON encoding.

Solution:

defmodule MapHelper do
  def atomize_keys(map) when is_map(map) do
    Map.new(map, fn {key, value} ->
      {String.to_atom(key), atomize_keys(value)}
    end)
  end

  def atomize_keys(value), do: value

  def stringify_keys(map) when is_map(map) do
    Map.new(map, fn {key, value} ->
      {to_string(key), stringify_keys(value)}
    end)
  end

  def stringify_keys(value), do: value
end

user = %{name: "Alice", age: 30, address: %{city: "NYC"}}
MapHelper.stringify_keys(user)

json_user = %{"name" => "Bob", "age" => 25}
MapHelper.atomize_keys(json_user)

How It Works: Map.new/2 reconstructs the map with transformed keys. Recursively processes nested maps.

Use Cases: JSON serialization, API responses, database records.

See Also: Beginner Tutorial - Data Structures, Pattern Matching Guide


Recipe 2: Merge Nested Maps Deeply

Problem: Merge two maps including nested maps (not just top level).

Solution:

defmodule DeepMerge do
  def deep_merge(left, right) do
    Map.merge(left, right, &deep_resolve/3)
  end

  defp deep_resolve(_key, left = %{}, right = %{}) do
    deep_merge(left, right)
  end

  defp deep_resolve(_key, _left, right) do
    right
  end
end

config1 = %{
  database: %{host: "localhost", port: 5432},
  cache: %{ttl: 300}
}

config2 = %{
  database: %{port: 5433, pool_size: 10},
  logging: %{level: :info}
}

DeepMerge.deep_merge(config1, config2)

How It Works: Map.merge/3 accepts resolver function. Recursively merges when both values are maps.

Use Cases: Configuration merging, data aggregation, API responses.

See Also: Beginner Tutorial - Maps


Recipe 3: Group List Items by Criteria

Problem: Group list elements by a specific attribute or function.

Solution:

defmodule ListHelper do
  def group_by(list, fun) do
    Enum.group_by(list, fun)
  end
end

users = [
  %{name: "Alice", role: :admin},
  %{name: "Bob", role: :user},
  %{name: "Charlie", role: :admin},
  %{name: "Diana", role: :user}
]

Enum.group_by(users, & &1.role)

words = ["apple", "apricot", "banana", "blueberry", "cherry"]
Enum.group_by(words, &String.first/1)

How It Works: Enum.group_by/2 applies function to each element, groups by result.

Use Cases: Data categorization, reporting, aggregation.

See Also: Beginner Tutorial - Enum


Recipe 4: Build Keyword List from Map

Problem: Convert map to keyword list for function options.

Solution:

defmodule KeywordHelper do
  def from_map(map) when is_map(map) do
    Enum.map(map, fn {k, v} -> {k, v} end)
  end
end

options = %{timeout: 5000, retry: 3, async: true}
KeywordHelper.from_map(options)

Map.to_list(options)

How It Works: Maps to list of tuples, maintaining keyword list format.

Use Cases: Function options, configuration, API parameters.

See Also: Beginner Tutorial - Keyword Lists


Recipe 5: Update Nested Structure Safely

Problem: Update deeply nested map or keyword list without raising errors.

Solution:

defmodule SafeUpdate do
  def update_in_safe(data, keys, default, fun) do
    update_in(data, keys, fn
      nil -> fun.(default)
      value -> fun.(value)
    end)
  rescue
    _ -> data
  end

  # Using get_and_update_in
  def safe_update(data, keys, fun) do
    get_and_update_in(data, keys, fn
      nil -> :pop
      value -> {value, fun.(value)}
    end)
  end
end

config = %{database: %{pool: %{size: 10}}}

put_in(config, [:database, :pool, :size],
  get_in(config, [:database, :pool, :size]) + 5)

update_in(config, [:cache, :ttl], &(&1 || 300))

{old_val, new_config} = get_and_update_in(config, [:cache, :ttl], fn
  nil -> {nil, 300}
  val -> {val, val * 2}
end)

How It Works: get_and_update_in/3 allows conditional updates. Returns tuple with old value and new structure.

Use Cases: Configuration updates, state management, data migration.

See Also: Beginner Tutorial - Data Structures


Pattern Matching

Recipe 6: Extract Values from Complex Structures

Problem: Extract multiple values from nested data structure.

Solution:

defmodule Extractor do
  def extract_user_info(user) do
    %{
      name: name,
      profile: %{age: age, city: city},
      preferences: %{theme: theme}
    } = user

    {name, age, city, theme}
  end

  # With default values
  def extract_with_defaults(user) do
    %{
      name: name,
      profile: %{age: age} = profile,
      preferences: prefs
    } = user

    city = Map.get(profile, :city, "Unknown")
    theme = Map.get(prefs, :theme, :light)

    {name, age, city, theme}
  end
end

user = %{
  name: "Alice",
  profile: %{age: 30, city: "NYC"},
  preferences: %{theme: :dark}
}

Extractor.extract_user_info(user)

How It Works: Pattern matching binds variables from nested structures in single expression.

Use Cases: Data extraction, API response parsing, validation.

See Also: Pattern Matching Guide, Beginner Tutorial


Recipe 7: Match and Transform with Case

Problem: Handle multiple patterns with different transformations.

Solution:

defmodule ResponseHandler do
  def handle_response(response) do
    case response do
      {:ok, %{status: 200, body: body}} ->
        {:success, Jason.decode!(body)}

      {:ok, %{status: status, body: body}} when status in 400..499 ->
        {:client_error, status, body}

      {:ok, %{status: status, body: body}} when status in 500..599 ->
        {:server_error, status, body}

      {:error, %{reason: :timeout}} ->
        {:timeout, "Request timed out"}

      {:error, reason} ->
        {:error, reason}

      _ ->
        {:unknown, response}
    end
  end
end

ResponseHandler.handle_response({:ok, %{status: 200, body: ~s({"id": 1})}})

ResponseHandler.handle_response({:ok, %{status: 404, body: "Not found"}})

ResponseHandler.handle_response({:error, %{reason: :timeout}})

How It Works: case evaluates patterns sequentially with guards. First match wins.

Use Cases: HTTP response handling, state machines, error handling.

See Also: Error Handling Guide, Beginner Tutorial - Control Flow


Recipe 8: Pin Operator for Exact Matching

Problem: Match against existing variable value, not rebind.

Solution:

defmodule Matcher do
  def find_user(users, target_id) do
    Enum.find(users, fn %{id: ^target_id} -> true; _ -> false end)
  end

  # Pattern matching in function heads
  def process_event(event, expected_type) do
    case event do
      %{type: ^expected_type, data: data} ->
        {:ok, data}

      %{type: other_type} ->
        {:error, "Expected #{expected_type}, got #{other_type}"}
    end
  end
end

users = [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
search_id = 2

Matcher.find_user(users, search_id)

event = %{type: :user_created, data: %{name: "Charlie"}}
Matcher.process_event(event, :user_created)

Matcher.process_event(event, :user_updated)

How It Works: Pin operator ^ matches existing value instead of rebinding variable.

Use Cases: List filtering, event processing, state validation.

See Also: Pattern Matching Guide


Functions and Pipes

Recipe 9: Compose Functions with Pipe Operator

Problem: Chain multiple transformations readably.

Solution:

defmodule DataPipeline do
  def process_user_input(input) do
    input
    |> String.trim()
    |> String.downcase()
    |> String.split(",")
    |> Enum.map(&String.trim/1)
    |> Enum.reject(&(&1 == ""))
    |> Enum.uniq()
    |> Enum.sort()
  end

  # With error handling
  def safe_process(input) do
    with {:ok, trimmed} <- validate_string(input),
         words <- String.split(trimmed, " "),
         {:ok, processed} <- transform_words(words) do
      {:ok, processed}
    else
      {:error, reason} -> {:error, reason}
      _ -> {:error, :unknown}
    end
  end

  defp validate_string(str) when is_binary(str) and byte_size(str) > 0 do
    {:ok, String.trim(str)}
  end
  defp validate_string(_), do: {:error, :invalid_input}

  defp transform_words(words) do
    {:ok, Enum.map(words, &String.upcase/1)}
  end
end

input = "  apple, banana,  apple, cherry,  , banana  "
DataPipeline.process_user_input(input)

How It Works: Pipe operator |> passes result as first argument to next function. with chains operations with error handling.

Use Cases: Data transformation, validation pipelines, ETL processes.

See Also: Beginner Tutorial - Pipe Operator


Recipe 10: Create Higher-Order Functions

Problem: Write functions that accept or return functions.

Solution:

defmodule FunctionHelper do
  # Function that returns function
  def multiplier(factor) do
    fn x -> x * factor end
  end

  # Function that accepts function
  def apply_twice(value, fun) do
    value |> fun.() |> fun.()
  end

  # Function composition
  def compose(f, g) do
    fn x -> f.(g.(x)) end
  end

  # Partial application
  def partial(fun, arg1) do
    fn arg2 -> fun.(arg1, arg2) end
  end
end

double = FunctionHelper.multiplier(2)
triple = FunctionHelper.multiplier(3)

double.(5)   # => 10
triple.(5)   # => 15

FunctionHelper.apply_twice(5, double)  # => 20

add_one = fn x -> x + 1 end
square = fn x -> x * x end
add_one_then_square = FunctionHelper.compose(square, add_one)

add_one_then_square.(4)  # => 25 (4+1=5, 5*5=25)

add = fn a, b -> a + b end
add_5 = FunctionHelper.partial(add, 5)
add_5.(3)  # => 8

How It Works: Functions are first-class values. Closures capture surrounding scope.

Use Cases: Functional composition, callbacks, strategy pattern, middleware.

See Also: Beginner Tutorial - Functions


Recipe 11: Use Capture Syntax for Conciseness

Problem: Simplify anonymous function syntax.

Solution:

defmodule CaptureExamples do
  # Long form vs capture
  def example_transforms(list) do
    # Long form
    long = Enum.map(list, fn x -> x * 2 end)

    # Capture syntax
    short = Enum.map(list, &(&1 * 2))

    {long, short}
  end

  # Capturing named functions
  def string_operations(words) do
    # Capture module function
    uppercased = Enum.map(words, &String.upcase/1)

    # Capture with multiple args
    prefixed = Enum.map(words, &("Item: " <> &1))

    {uppercased, prefixed}
  end

  # Complex captures
  def complex_example(users) do
    # Extract field
    names = Enum.map(users, & &1.name)

    # Method call
    lengths = Enum.map(names, &String.length/1)

    # Operator as function
    sum = Enum.reduce([1, 2, 3, 4], &+/2)

    {names, lengths, sum}
  end
end

CaptureExamples.example_transforms([1, 2, 3])

CaptureExamples.string_operations(["hello", "world"])

users = [%{name: "Alice"}, %{name: "Bob"}]
CaptureExamples.complex_example(users)

How It Works: & creates anonymous function. &1, &2, ... are positional arguments.

Use Cases: Enum operations, transformations, filtering.

See Also: Beginner Tutorial - Anonymous Functions


String Manipulation

Recipe 12: Parse and Format Strings

Problem: Extract data from formatted strings and reconstruct.

Solution:

defmodule StringParser do
  # Parse structured string
  def parse_log_line(line) do
    ~r/\[(?<level>\w+)\] (?<timestamp>.+) - (?<message>.+)/
    |> Regex.named_captures(line)
  end

  # Format with interpolation
  def format_user(user) do
    "User: #{user.name} (ID: #{user.id}, Age: #{user.age})"
  end

  # Parse CSV line
  def parse_csv(line) do
    line
    |> String.split(",")
    |> Enum.map(&String.trim/1)
  end

  # Build query string
  def build_query_string(params) do
    params
    |> Enum.map(fn {k, v} -> "#{k}=#{URI.encode(to_string(v))}" end)
    |> Enum.join("&")
  end
end

log = "[ERROR] 2024-12-21 10:30:45 - Database connection failed"
StringParser.parse_log_line(log)

user = %{name: "Alice", id: 123, age: 30}
StringParser.format_user(user)

StringParser.parse_csv("apple, banana,  cherry  ")

params = %{query: "elixir programming", page: 2, limit: 10}
StringParser.build_query_string(params)

How It Works: Regex for pattern extraction. String interpolation for formatting. URI encoding for safe URLs.

Use Cases: Log parsing, CSV processing, URL building, data extraction.

See Also: Beginner Tutorial - Strings, String Handling Guide


Recipe 13: String Validation and Sanitization

Problem: Validate and clean user input strings.

Solution:

defmodule StringValidator do
  # Email validation (simple)
  def valid_email?(email) do
    Regex.match?(~r/^[\w._%+-]+@[\w.-]+\.[a-zA-Z]{2,}$/, email)
  end

  # Sanitize HTML
  def sanitize_html(input) do
    input
    |> String.replace(~r/<[^>]*>/, "")
    |> String.replace("&", "&amp;")
    |> String.replace("<", "&lt;")
    |> String.replace(">", "&gt;")
  end

  # Validate length
  def valid_length?(str, min, max) do
    length = String.length(str)
    length >= min and length <= max
  end

  # Remove special characters
  def alphanumeric_only(str) do
    Regex.replace(~r/[^a-zA-Z0-9]/, str, "")
  end

  # Validate phone number
  def valid_phone?(phone) do
    # US format: (123) 456-7890 or 123-456-7890
    cleaned = Regex.replace(~r/[^\d]/, phone, "")
    String.length(cleaned) == 10
  end
end

StringValidator.valid_email?("user@example.com")  # => true
StringValidator.valid_email?("invalid.email")     # => false

html = "<script>alert('xss')</script>Hello <b>World</b>"
StringValidator.sanitize_html(html)

StringValidator.valid_length?("password123", 8, 20)  # => true
StringValidator.alphanumeric_only("hello-world_123!")  # => "helloworld123"

StringValidator.valid_phone?("(123) 456-7890")  # => true
StringValidator.valid_phone?("123-456-7890")    # => true

How It Works: Regex for pattern matching. String functions for cleaning and validation.

Use Cases: Form validation, user input sanitization, data cleaning.

See Also: Beginner Tutorial - Strings


Concurrency

Recipe 14: Parallel Map with Task

Problem: Process list items concurrently for better performance.

Solution:

defmodule ParallelProcessor do
  def parallel_map(collection, fun) do
    collection
    |> Enum.map(&Task.async(fn -> fun.(&1) end))
    |> Enum.map(&Task.await(&1, 10_000))
  end

  # With timeout handling
  def safe_parallel_map(collection, fun, timeout \\ 5000) do
    collection
    |> Task.async_stream(fun, timeout: timeout, max_concurrency: System.schedulers_online())
    |> Enum.map(fn
      {:ok, result} -> result
      {:exit, reason} -> {:error, reason}
    end)
  end

  # Process in chunks
  def parallel_chunk(collection, chunk_size, fun) do
    collection
    |> Enum.chunk_every(chunk_size)
    |> Task.async_stream(fn chunk -> Enum.map(chunk, fun) end)
    |> Enum.flat_map(fn {:ok, results} -> results end)
  end
end

fetch_user = fn id ->
  :timer.sleep(100)  # Simulate delay
  %{id: id, name: "User #{id}"}
end

Enum.map(1..10, fetch_user)

ParallelProcessor.parallel_map(1..10, fetch_user)

compute = fn x ->
  :timer.sleep(x * 100)
  x * x
end

ParallelProcessor.safe_parallel_map([1, 2, 3, 4, 5], compute, 1000)

How It Works: Task.async/1 spawns concurrent process. Task.await/2 waits for result. Task.async_stream/3 provides built-in timeout and concurrency control.

Use Cases: HTTP requests, file processing, data transformation, I/O operations.

See Also: Concurrency Guide, Intermediate Tutorial - Task


Recipe 15: Process Registry Pattern

Problem: Find and communicate with named processes.

Solution:

defmodule UserCache do
  use GenServer

  # Start with name registration
  def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    GenServer.start_link(__MODULE__, %{}, name: name)
  end

  def put(server \\ __MODULE__, key, value) do
    GenServer.call(server, {:put, key, value})
  end

  def get(server \\ __MODULE__, key) do
    GenServer.call(server, {:get, key})
  end

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_call({:put, key, value}, _from, state) do
    {:reply, :ok, Map.put(state, key, value)}
  end

  def handle_call({:get, key}, _from, state) do
    {:reply, Map.get(state, key), state}
  end
end

defmodule DynamicCache do
  def start_cache(user_id) do
    name = {:via, Registry, {MyApp.Registry, {:cache, user_id}}}
    GenServer.start_link(UserCache, %{}, name: name)
  end

  def get_cache(user_id) do
    name = {:via, Registry, {MyApp.Registry, {:cache, user_id}}}
    GenServer.whereis(name)
  end
end


{:ok, _pid} = UserCache.start_link(name: :my_cache)
UserCache.put(:my_cache, "user_1", %{name: "Alice"})
UserCache.get(:my_cache, "user_1")

{:ok, _pid} = DynamicCache.start_cache(123)
cache_pid = DynamicCache.get_cache(123)
GenServer.call(cache_pid, {:put, "key", "value"})

How It Works: Named processes registered globally or via Registry. {:via, Registry, {registry_name, key}} for dynamic naming.

Use Cases: Process discovery, distributed caching, session management.

See Also: GenServer Guide, Intermediate Tutorial - GenServer


Recipe 16: Debounce and Throttle Operations

Problem: Rate-limit operations to prevent overwhelming system.

Solution:

defmodule RateLimiter do
  use GenServer

  defmodule State do
    defstruct [:last_call, :interval, :pending_timer]
  end

  # Start with minimum interval (milliseconds)
  def start_link(interval) do
    GenServer.start_link(__MODULE__, interval, name: __MODULE__)
  end

  # Throttle: Execute immediately if interval passed, else ignore
  def throttle(fun) do
    GenServer.call(__MODULE__, {:throttle, fun})
  end

  # Debounce: Delay execution, reset timer on new call
  def debounce(fun) do
    GenServer.cast(__MODULE__, {:debounce, fun})
  end

  @impl true
  def init(interval) do
    {:ok, %State{last_call: 0, interval: interval, pending_timer: nil}}
  end

  @impl true
  def handle_call({:throttle, fun}, _from, state) do
    now = System.monotonic_time(:millisecond)
    elapsed = now - state.last_call

    if elapsed >= state.interval do
      result = fun.()
      {:reply, {:ok, result}, %{state | last_call: now}}
    else
      {:reply, {:throttled, state.interval - elapsed}, state}
    end
  end

  @impl true
  def handle_cast({:debounce, fun}, state) do
    # Cancel pending timer
    if state.pending_timer do
      Process.cancel_timer(state.pending_timer)
    end

    # Schedule new execution
    timer = Process.send_after(self(), {:execute, fun}, state.interval)
    {:noreply, %{state | pending_timer: timer}}
  end

  @impl true
  def handle_info({:execute, fun}, state) do
    fun.()
    {:noreply, %{state | pending_timer: nil}}
  end
end

{:ok, _} = RateLimiter.start_link(1000)  # 1 second interval

RateLimiter.throttle(fn -> IO.puts("Call 1") end)  # Executes

RateLimiter.throttle(fn -> IO.puts("Call 2") end)  # Throttled

:timer.sleep(1000)
RateLimiter.throttle(fn -> IO.puts("Call 3") end)  # Executes

RateLimiter.debounce(fn -> IO.puts("Debounced 1") end)
RateLimiter.debounce(fn -> IO.puts("Debounced 2") end)
RateLimiter.debounce(fn -> IO.puts("Debounced 3") end)

How It Works: Throttle checks elapsed time since last execution. Debounce cancels pending timer and reschedules.

Use Cases: Search autocomplete, API rate limiting, event handling, scroll events.

See Also: GenServer Guide


OTP Patterns

Recipe 17: Simple State Machine with GenServer

Problem: Model stateful behavior with transitions.

Solution:

defmodule OrderStateMachine do
  use GenServer

  @states [:pending, :confirmed, :shipped, :delivered, :cancelled]

  defmodule State do
    defstruct [:order_id, :current_state, :history]
  end

  # Client API
  def start_link(order_id) do
    GenServer.start_link(__MODULE__, order_id)
  end

  def get_state(pid) do
    GenServer.call(pid, :get_state)
  end

  def transition(pid, new_state) do
    GenServer.call(pid, {:transition, new_state})
  end

  # Server callbacks
  @impl true
  def init(order_id) do
    state = %State{
      order_id: order_id,
      current_state: :pending,
      history: [{:pending, DateTime.utc_now()}]
    }
    {:ok, state}
  end

  @impl true
  def handle_call(:get_state, _from, state) do
    {:reply, state.current_state, state}
  end

  def handle_call({:transition, new_state}, _from, state) do
    if valid_transition?(state.current_state, new_state) do
      new_history = [{new_state, DateTime.utc_now()} | state.history]
      new_state_struct = %{state | current_state: new_state, history: new_history}
      {:reply, {:ok, new_state}, new_state_struct}
    else
      {:reply, {:error, :invalid_transition}, state}
    end
  end

  defp valid_transition?(:pending, :confirmed), do: true
  defp valid_transition?(:pending, :cancelled), do: true
  defp valid_transition?(:confirmed, :shipped), do: true
  defp valid_transition?(:confirmed, :cancelled), do: true
  defp valid_transition?(:shipped, :delivered), do: true
  defp valid_transition?(_, _), do: false
end

{:ok, order} = OrderStateMachine.start_link("ORD-123")

OrderStateMachine.get_state(order)

OrderStateMachine.transition(order, :confirmed)

OrderStateMachine.transition(order, :delivered)

OrderStateMachine.transition(order, :shipped)

OrderStateMachine.transition(order, :delivered)

How It Works: GenServer maintains state. Validates transitions before applying. Stores history of state changes.

Use Cases: Order processing, workflow management, game state, approval flows.

See Also: GenServer Guide, Intermediate Tutorial


Recipe 18: Supervision Tree with Dynamic Children

Problem: Supervise dynamically created processes.

Solution:

defmodule Worker do
  use GenServer

  def start_link(id) do
    GenServer.start_link(__MODULE__, id, name: via_tuple(id))
  end

  def get_state(id) do
    GenServer.call(via_tuple(id), :get_state)
  end

  defp via_tuple(id) do
    {:via, Registry, {WorkerRegistry, id}}
  end

  @impl true
  def init(id) do
    {:ok, %{id: id, data: %{}, started_at: DateTime.utc_now()}}
  end

  @impl true
  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end
end

defmodule WorkerSupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def start_worker(id) do
    spec = {Worker, id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def stop_worker(id) do
    case Registry.lookup(WorkerRegistry, id) do
      [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
      [] -> {:error, :not_found}
    end
  end

  def list_workers do
    DynamicSupervisor.which_children(__MODULE__)
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Registry, keys: :unique, name: WorkerRegistry},
      WorkerSupervisor
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

{:ok, _pid} = WorkerSupervisor.start_worker("worker_1")
{:ok, _pid} = WorkerSupervisor.start_worker("worker_2")

Worker.get_state("worker_1")

WorkerSupervisor.list_workers()

WorkerSupervisor.stop_worker("worker_1")

How It Works: DynamicSupervisor manages children added at runtime. Registry for process discovery. :one_for_one strategy restarts only failed child.

Use Cases: Connection pools, user sessions, background jobs, WebSocket handlers.

See Also: Supervision Guide, Intermediate Tutorial - Supervisor


Recipe 19: Periodic Tasks with GenServer

Problem: Execute recurring tasks at intervals.

Solution:

defmodule PeriodicWorker do
  use GenServer
  require Logger

  defmodule State do
    defstruct [:interval, :task, :timer_ref]
  end

  # Start with task and interval (milliseconds)
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    interval = Keyword.fetch!(opts, :interval)
    task = Keyword.fetch!(opts, :task)

    state = %State{interval: interval, task: task}
    {:ok, schedule_work(state)}
  end

  @impl true
  def handle_info(:work, state) do
    # Execute task
    try do
      state.task.()
    rescue
      e -> Logger.error("Task failed: #{inspect(e)}")
    end

    # Schedule next execution
    {:noreply, schedule_work(state)}
  end

  defp schedule_work(state) do
    timer_ref = Process.send_after(self(), :work, state.interval)
    %{state | timer_ref: timer_ref}
  end
end

defmodule SimplePeriodicWorker do
  use GenServer

  def start_link(interval, task) do
    GenServer.start_link(__MODULE__, {interval, task})
  end

  @impl true
  def init({interval, task}) do
    {:ok, _timer} = :timer.send_interval(interval, :tick)
    {:ok, task}
  end

  @impl true
  def handle_info(:tick, task) do
    task.()
    {:noreply, task}
  end
end

task = fn ->
  Logger.info("Periodic task executed at #{DateTime.utc_now()}")
  # Cleanup, health checks, metrics collection, etc.
end

{:ok, _pid} = PeriodicWorker.start_link(interval: 5000, task: task)

{:ok, _pid} = SimplePeriodicWorker.start_link(10_000, task)

How It Works: Process.send_after/3 schedules message to self. Reschedules after execution for continuous loop. :timer.send_interval/2 simpler but less flexible.

Use Cases: Cache cleanup, health checks, metrics collection, scheduled reports.

See Also: GenServer Guide


Error Handling

Recipe 20: Result Tuple Pattern

Problem: Handle success and error cases explicitly.

Solution:

defmodule UserService do
  def create_user(attrs) do
    with {:ok, validated} <- validate_attrs(attrs),
         {:ok, user} <- insert_user(validated),
         {:ok, _email} <- send_welcome_email(user) do
      {:ok, user}
    else
      {:error, :invalid_attrs} = error -> error
      {:error, :db_error} = error -> error
      {:error, :email_failed} ->
        # User created but email failed - still success
        # Could also rollback here if needed
        {:ok, user}
      error ->
        {:error, {:unexpected, error}}
    end
  end

  defp validate_attrs(%{email: email, name: name})
    when is_binary(email) and is_binary(name) do
    {:ok, %{email: email, name: name}}
  end
  defp validate_attrs(_), do: {:error, :invalid_attrs}

  defp insert_user(attrs) do
    # Simulate DB insert
    if String.contains?(attrs.email, "@") do
      {:ok, Map.put(attrs, :id, :rand.uniform(1000))}
    else
      {:error, :db_error}
    end
  end

  defp send_welcome_email(user) do
    # Simulate email sending
    {:ok, "Email sent to #{user.email}"}
  end
end

defmodule UserController do
  def create(params) do
    case UserService.create_user(params) do
      {:ok, user} ->
        {:json, %{status: "success", user: user}}

      {:error, :invalid_attrs} ->
        {:json, %{status: "error", message: "Invalid attributes"}, status: 400}

      {:error, :db_error} ->
        {:json, %{status: "error", message: "Database error"}, status: 500}

      {:error, reason} ->
        {:json, %{status: "error", message: "Unknown error: #{inspect(reason)}"},
         status: 500}
    end
  end
end

UserService.create_user(%{email: "alice@example.com", name: "Alice"})

UserService.create_user(%{email: "invalid", name: "Bob"})

UserService.create_user(%{name: "Charlie"})

How It Works: with chains operations, short-circuits on first {:error, _}. Pattern match in else clause for specific error handling.

Use Cases: Business logic, API handlers, data pipelines, validation flows.

See Also: Error Handling Guide, Beginner Tutorial - Error Handling


Recipe 21: Supervision for Fault Tolerance

Problem: Recover automatically from crashes.

Solution:

defmodule RiskyWorker do
  use GenServer
  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_risky_work(data) do
    GenServer.call(__MODULE__, {:risky, data})
  end

  @impl true
  def init(_opts) do
    Logger.info("RiskyWorker started")
    {:ok, %{restarts: 0}}
  end

  @impl true
  def handle_call({:risky, data}, _from, state) do
    # Simulate random crashes
    if :rand.uniform() > 0.7 do
      raise "Simulated crash!"
    end

    {:reply, {:ok, "Processed: #{data}"}, state}
  end
end

defmodule MyApp.Supervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      # Restart strategy: restart worker on crash
      {RiskyWorker, []}
    ]

    # :one_for_one - only restart crashed child
    # max_restarts: 3, max_seconds: 5 - restart up to 3 times in 5 seconds
    Supervisor.init(children, strategy: :one_for_one, max_restarts: 3, max_seconds: 5)
  end
end

{:ok, _sup} = MyApp.Supervisor.start_link([])

for i <- 1..10 do
  try do
    case RiskyWorker.do_risky_work("Task #{i}") do
      {:ok, result} -> IO.puts("Success: #{result}")
      error -> IO.puts("Error: #{inspect(error)}")
    end
  catch
    :exit, reason -> IO.puts("Process crashed: #{inspect(reason)}")
  end

  :timer.sleep(100)
end

How It Works: Supervisor monitors child processes. Restarts crashed children per strategy. Max restarts prevents crash loops.

Use Cases: Network connections, external services, unreliable operations, background jobs.

See Also: Supervision Guide, Intermediate Tutorial - Supervisor


File I/O

Recipe 22: Read and Write Files Safely

Problem: Handle file operations with proper error handling.

Solution:

defmodule FileHelper do
  # Read file with error handling
  def read_file(path) do
    case File.read(path) do
      {:ok, content} -> {:ok, content}
      {:error, :enoent} -> {:error, :file_not_found}
      {:error, reason} -> {:error, reason}
    end
  end

  # Read lines lazily (memory efficient)
  def read_lines(path) do
    File.stream!(path)
    |> Stream.map(&String.trim/1)
    |> Enum.to_list()
  end

  # Write with automatic directory creation
  def write_file(path, content) do
    dir = Path.dirname(path)

    with :ok <- File.mkdir_p(dir),
         :ok <- File.write(path, content) do
      {:ok, path}
    else
      {:error, reason} -> {:error, reason}
    end
  end

  # Append to file
  def append_to_file(path, content) do
    File.write(path, content, [:append])
  end

  # Read JSON file
  def read_json(path) do
    with {:ok, content} <- File.read(path),
         {:ok, decoded} <- Jason.decode(content) do
      {:ok, decoded}
    end
  end

  # Write JSON file (pretty-printed)
  def write_json(path, data) do
    with {:ok, json} <- Jason.encode(data, pretty: true),
         :ok <- write_file(path, json) do
      {:ok, path}
    end
  end
end

{:ok, content} = FileHelper.read_file("config.txt")

FileHelper.write_file("output/results.txt", "Hello, World!")

lines = FileHelper.read_lines("large_file.txt")

data = %{users: [%{name: "Alice", age: 30}, %{name: "Bob", age: 25}]}
FileHelper.write_json("data/users.json", data)

{:ok, loaded} = FileHelper.read_json("data/users.json")

FileHelper.append_to_file("app.log", "#{DateTime.utc_now()} - Server started\n")

How It Works: Pattern match File module results. Stream API for memory-efficient large file processing. Path module for directory operations.

Use Cases: Configuration files, data import/export, logging, file processing.

See Also: File I/O Guide, Beginner Tutorial - File I/O


Recipe 23: CSV Processing

Problem: Parse and generate CSV files efficiently.

Solution:

defmodule CSVHelper do
  # Simple CSV parser (no library)
  def parse_csv(path) do
    path
    |> File.stream!()
    |> Stream.map(&String.trim/1)
    |> Stream.map(&parse_csv_line/1)
    |> Enum.to_list()
  end

  defp parse_csv_line(line) do
    line
    |> String.split(",")
    |> Enum.map(&String.trim/1)
  end

  # Parse CSV to maps (first row as headers)
  def parse_csv_to_maps(path) do
    [headers | rows] = parse_csv(path)

    rows
    |> Enum.map(fn row ->
      headers
      |> Enum.zip(row)
      |> Map.new(fn {h, v} -> {String.to_atom(h), v} end)
    end)
  end

  # Write CSV from list of maps
  def write_csv(path, data, headers) do
    csv_content =
      [headers | Enum.map(data, fn row ->
        Enum.map(headers, &Map.get(row, &1, ""))
      end)]
      |> Enum.map(&Enum.join(&1, ","))
      |> Enum.join("\n")

    File.write(path, csv_content)
  end

  # Stream large CSV (memory efficient)
  def stream_csv(path, fun) do
    path
    |> File.stream!()
    |> Stream.drop(1)  # Skip header
    |> Stream.map(&parse_csv_line/1)
    |> Stream.each(fun)
    |> Stream.run()
  end
end

rows = CSVHelper.parse_csv("users.csv")

users = CSVHelper.parse_csv_to_maps("users.csv")

data = [
  %{name: "Alice", age: 30, city: "NYC"},
  %{name: "Bob", age: 25, city: "LA"}
]
headers = [:name, :age, :city]
CSVHelper.write_csv("output.csv", data, headers)

CSVHelper.stream_csv("large_data.csv", fn row ->
  # Process each row without loading entire file
  IO.inspect(row)
end)

How It Works: Stream API for lazy evaluation. Map headers to values for structured data. String operations for parsing.

Use Cases: Data import/export, reporting, data migration, ETL.

See Also: File I/O Guide


Phoenix Web Development

Recipe 24: Build RESTful API Endpoints

Problem: Create standard CRUD endpoints with proper error handling.

Solution:

defmodule MyAppWeb.UserController do
  use MyAppWeb, :controller
  alias MyApp.Accounts

  # Index - List all users
  def index(conn, params) do
    page = Map.get(params, "page", "1") |> String.to_integer()
    limit = Map.get(params, "limit", "20") |> String.to_integer()

    users = Accounts.list_users(page: page, limit: limit)
    render(conn, :index, users: users)
  end

  # Show - Get single user
  def show(conn, %{"id" => id}) do
    case Accounts.get_user(id) do
      nil ->
        conn
        |> put_status(:not_found)
        |> json(%{error: "User not found"})

      user ->
        render(conn, :show, user: user)
    end
  end

  # Create - Add new user
  def create(conn, %{"user" => user_params}) do
    case Accounts.create_user(user_params) do
      {:ok, user} ->
        conn
        |> put_status(:created)
        |> put_resp_header("location", ~p"/api/users/#{user.id}")
        |> render(:show, user: user)

      {:error, changeset} ->
        conn
        |> put_status(:unprocessable_entity)
        |> json(%{errors: translate_errors(changeset)})
    end
  end

  # Update - Modify existing user
  def update(conn, %{"id" => id, "user" => user_params}) do
    case Accounts.get_user(id) do
      nil ->
        conn
        |> put_status(:not_found)
        |> json(%{error: "User not found"})

      user ->
        case Accounts.update_user(user, user_params) do
          {:ok, updated_user} ->
            render(conn, :show, user: updated_user)

          {:error, changeset} ->
            conn
            |> put_status(:unprocessable_entity)
            |> json(%{errors: translate_errors(changeset)})
        end
    end
  end

  # Delete - Remove user
  def delete(conn, %{"id" => id}) do
    case Accounts.get_user(id) do
      nil ->
        conn
        |> put_status(:not_found)
        |> json(%{error: "User not found"})

      user ->
        {:ok, _user} = Accounts.delete_user(user)
        send_resp(conn, :no_content, "")
    end
  end

  defp translate_errors(changeset) do
    Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
      Enum.reduce(opts, msg, fn {key, value}, acc ->
        String.replace(acc, "%{#{key}}", to_string(value))
      end)
    end)
  end
end

defmodule MyAppWeb.UserJSON do
  def index(%{users: users}) do
    %{data: Enum.map(users, &user_json/1)}
  end

  def show(%{user: user}) do
    %{data: user_json(user)}
  end

  defp user_json(user) do
    %{
      id: user.id,
      name: user.name,
      email: user.email,
      inserted_at: user.inserted_at,
      updated_at: user.updated_at
    }
  end
end

defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
  end

  scope "/api", MyAppWeb do
    pipe_through :api

    resources "/users", UserController, except: [:new, :edit]
  end
end

How It Works: Controller handles request/response. Pattern match params. Return appropriate HTTP status codes. JSON view formats data.

Use Cases: REST APIs, mobile backends, microservices.

See Also: Phoenix REST API Guide, Intermediate Tutorial - Phoenix


Recipe 25: Authentication with Plugs

Problem: Protect routes with authentication.

Solution:

defmodule MyAppWeb.Plugs.Auth do
  import Plug.Conn
  import Phoenix.Controller

  def init(opts), do: opts

  def call(conn, _opts) do
    case get_req_header(conn, "authorization") do
      ["Bearer " <> token] ->
        verify_token(conn, token)

      _ ->
        conn
        |> put_status(:unauthorized)
        |> json(%{error: "Missing authorization header"})
        |> halt()
    end
  end

  defp verify_token(conn, token) do
    case MyApp.Auth.verify_token(token) do
      {:ok, user_id} ->
        conn
        |> assign(:current_user_id, user_id)
        |> assign(:authenticated, true)

      {:error, _reason} ->
        conn
        |> put_status(:unauthorized)
        |> json(%{error: "Invalid token"})
        |> halt()
    end
  end
end

defmodule MyAppWeb.Plugs.OptionalAuth do
  import Plug.Conn

  def init(opts), do: opts

  def call(conn, _opts) do
    case get_req_header(conn, "authorization") do
      ["Bearer " <> token] ->
        case MyApp.Auth.verify_token(token) do
          {:ok, user_id} ->
            assign(conn, :current_user_id, user_id)

          {:error, _} ->
            conn
        end

      _ ->
        conn
    end
  end
end

defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
  end

  pipeline :authenticated do
    plug MyAppWeb.Plugs.Auth
  end

  pipeline :optional_auth do
    plug MyAppWeb.Plugs.OptionalAuth
  end

  scope "/api", MyAppWeb do
    pipe_through [:api, :authenticated]

    resources "/users", UserController
    get "/profile", ProfileController, :show
  end

  scope "/api", MyAppWeb do
    pipe_through [:api, :optional_auth]

    get "/posts", PostController, :index
  end

  scope "/api", MyAppWeb do
    pipe_through :api

    post "/login", SessionController, :create
    post "/register", RegistrationController, :create
  end
end

defmodule MyApp.Auth do
  @secret "your-secret-key"

  def verify_token(token) do
    case Phoenix.Token.verify(MyAppWeb.Endpoint, @secret, token, max_age: 86400) do
      {:ok, user_id} -> {:ok, user_id}
      {:error, reason} -> {:error, reason}
    end
  end

  def generate_token(user_id) do
    Phoenix.Token.sign(MyAppWeb.Endpoint, @secret, user_id)
  end
end

How It Works: Plug intercepts request before controller. Checks authorization header. Verifies token and assigns user. halt() prevents further processing.

Use Cases: API authentication, role-based access, session management.

See Also: Phoenix REST API Guide


LiveView Patterns

Recipe 26: Real-Time Form with Validation

Problem: Build interactive form with live validation.

Solution:

defmodule MyAppWeb.UserLive.Form do
  use MyAppWeb, :live_view
  alias MyApp.Accounts
  alias MyApp.Accounts.User

  def mount(_params, _session, socket) do
    changeset = Accounts.change_user(%User{})

    {:ok,
     socket
     |> assign(:changeset, changeset)
     |> assign(:submitted, false)}
  end

  def handle_event("validate", %{"user" => user_params}, socket) do
    changeset =
      %User{}
      |> Accounts.change_user(user_params)
      |> Map.put(:action, :validate)

    {:noreply, assign(socket, :changeset, changeset)}
  end

  def handle_event("save", %{"user" => user_params}, socket) do
    case Accounts.create_user(user_params) do
      {:ok, user} ->
        {:noreply,
         socket
         |> put_flash(:info, "User created successfully")
         |> push_navigate(to: ~p"/users/#{user.id}")}

      {:error, %Ecto.Changeset{} = changeset} ->
        {:noreply,
         socket
         |> assign(:changeset, changeset)
         |> assign(:submitted, true)}
    end
  end

  def render(assigns) do
    ~H"""
    <div class="max-w-md mx-auto">
      <h1 class="text-2xl font-bold mb-4">Create User</h1>

      <.form
        for={@changeset}
        phx-change="validate"
        phx-submit="save"
        class="space-y-4"
      >
        <div>
          <.input
            field={@changeset[:name]}
            type="text"
            label="Name"
            placeholder="Enter your name"
          />
        </div>

        <div>
          <.input
            field={@changeset[:email]}
            type="email"
            label="Email"
            placeholder="user@example.com"
          />
        </div>

        <div>
          <.input
            field={@changeset[:age]}
            type="number"
            label="Age"
            placeholder="18"
          />
        </div>

        <.button type="submit" class="w-full">
          Create User
        </.button>
      </.form>
    </div>
    """
  end
end

How It Works: phx-change triggers validation on input. phx-submit saves form. Changeset updates live in socket. Errors display immediately.

Use Cases: Forms, user input, data entry, interactive UIs.

See Also: LiveView Guide, Intermediate Tutorial - LiveView


Recipe 27: WebSocket Communication with Phoenix Channels

Problem: Implement real-time bidirectional communication using WebSockets.

Solution:

defmodule MyAppWeb.RoomChannel do
  use MyAppWeb, :channel
  require Logger

  @impl true
  def join("room:" <> room_id, params, socket) do
    case authorized?(socket, room_id, params) do
      true ->
        send(self(), :after_join)
        {:ok, %{room_id: room_id}, assign(socket, :room_id, room_id)}

      false ->
        {:error, %{reason: "unauthorized"}}
    end
  end

  @impl true
  def handle_info(:after_join, socket) do
    # Broadcast user joined
    broadcast!(socket, "user_joined", %{
      user_id: socket.assigns.user_id,
      joined_at: DateTime.utc_now()
    })

    # Push initial room state
    push(socket, "room_state", %{
      users: get_room_users(socket.assigns.room_id),
      messages: get_recent_messages(socket.assigns.room_id)
    })

    {:noreply, socket}
  end

  @impl true
  def handle_in("new_message", %{"body" => body}, socket) do
    message = %{
      id: generate_id(),
      body: body,
      user_id: socket.assigns.user_id,
      inserted_at: DateTime.utc_now()
    }

    # Save message to database
    case create_message(socket.assigns.room_id, message) do
      {:ok, saved_message} ->
        # Broadcast to all users in room
        broadcast!(socket, "new_message", saved_message)
        {:reply, {:ok, saved_message}, socket}

      {:error, changeset} ->
        {:reply, {:error, %{errors: changeset}}, socket}
    end
  end

  @impl true
  def handle_in("typing", %{"typing" => is_typing}, socket) do
    broadcast_from!(socket, "user_typing", %{
      user_id: socket.assigns.user_id,
      typing: is_typing
    })

    {:noreply, socket}
  end

  @impl true
  def handle_in("read_receipt", %{"message_id" => message_id}, socket) do
    broadcast_from!(socket, "message_read", %{
      message_id: message_id,
      user_id: socket.assigns.user_id,
      read_at: DateTime.utc_now()
    })

    {:noreply, socket}
  end

  @impl true
  def terminate(reason, socket) do
    Logger.info("User #{socket.assigns.user_id} left room #{socket.assigns.room_id}: #{inspect(reason)}")

    broadcast!(socket, "user_left", %{
      user_id: socket.assigns.user_id,
      left_at: DateTime.utc_now()
    })

    :ok
  end

  defp authorized?(socket, room_id, _params) do
    # Check if user has access to room
    user_id = socket.assigns[:user_id]
    user_id != nil && has_room_access?(user_id, room_id)
  end

  defp has_room_access?(_user_id, _room_id), do: true
  defp get_room_users(_room_id), do: []
  defp get_recent_messages(_room_id), do: []
  defp create_message(_room_id, message), do: {:ok, message}
  defp generate_id, do: :crypto.strong_rand_bytes(16) |> Base.encode16()
end

defmodule MyAppWeb.UserSocket do
  use Phoenix.Socket

  channel "room:*", MyAppWeb.RoomChannel
  channel "notifications:*", MyAppWeb.NotificationChannel

  @impl true
  def connect(%{"token" => token}, socket, _connect_info) do
    case verify_token(token) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}

      {:error, _reason} ->
        :error
    end
  end

  def connect(_params, _socket, _connect_info), do: :error

  @impl true
  def id(socket), do: "users_socket:#{socket.assigns.user_id}"

  defp verify_token(token) do
    # Verify JWT or session token
    {:ok, token}
  end
end

#
#
#
#
#
#

How It Works: Phoenix Channels provide WebSocket abstraction. join/3 authorizes connection. handle_in/3 processes incoming messages. broadcast!/3 sends to all subscribers. push/3 sends to specific client. Socket maintains user state.

Use Cases: Chat applications, collaborative editing, real-time dashboards, multiplayer games, live notifications.

See Also: Phoenix REST API Guide, Intermediate Tutorial - Phoenix


Recipe 28: Session-Based Authentication

Problem: Implement secure session-based authentication in Phoenix.

Solution:

defmodule MyAppWeb.SessionController do
  use MyAppWeb, :controller
  alias MyApp.Accounts

  def new(conn, _params) do
    render(conn, "new.html")
  end

  def create(conn, %{"session" => %{"email" => email, "password" => password}}) do
    case Accounts.authenticate_user(email, password) do
      {:ok, user} ->
        conn
        |> put_flash(:info, "Welcome back!")
        |> put_session(:user_id, user.id)
        |> configure_session(renew: true)  # Prevent session fixation
        |> redirect(to: ~p"/dashboard")

      {:error, :invalid_credentials} ->
        conn
        |> put_flash(:error, "Invalid email or password")
        |> render("new.html")
    end
  end

  def delete(conn, _params) do
    conn
    |> configure_session(drop: true)
    |> put_flash(:info, "Logged out successfully")
    |> redirect(to: ~p"/")
  end
end

defmodule MyApp.Accounts do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Accounts.User

  def authenticate_user(email, password) do
    user = Repo.get_by(User, email: email)

    cond do
      user && Bcrypt.verify_pass(password, user.password_hash) ->
        {:ok, user}

      user ->
        # Prevent timing attacks
        Bcrypt.no_user_verify()
        {:error, :invalid_credentials}

      true ->
        Bcrypt.no_user_verify()
        {:error, :invalid_credentials}
    end
  end

  def register_user(attrs) do
    %User{}
    |> User.registration_changeset(attrs)
    |> Repo.insert()
  end
end

defmodule MyApp.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset

  schema "users" do
    field :email, :string
    field :password, :string, virtual: true
    field :password_hash, :string
    field :confirmed_at, :naive_datetime

    timestamps()
  end

  def registration_changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :password])
    |> validate_required([:email, :password])
    |> validate_format(:email, ~r/^[^\s]+@[^\s]+$/, message: "must be a valid email")
    |> validate_length(:password, min: 8)
    |> validate_password_strength()
    |> unique_constraint(:email)
    |> put_password_hash()
  end

  defp validate_password_strength(changeset) do
    password = get_change(changeset, :password)

    if password && String.length(password) >= 8 do
      # Check for at least one number and one uppercase
      has_number = String.match?(password, ~r/\d/)
      has_uppercase = String.match?(password, ~r/[A-Z]/)

      if has_number && has_uppercase do
        changeset
      else
        add_error(changeset, :password, "must contain at least one number and one uppercase letter")
      end
    else
      changeset
    end
  end

  defp put_password_hash(changeset) do
    case changeset do
      %Ecto.Changeset{valid?: true, changes: %{password: password}} ->
        put_change(changeset, :password_hash, Bcrypt.hash_pwd_salt(password))

      _ ->
        changeset
    end
  end
end

defmodule MyAppWeb.Plugs.RequireAuth do
  import Plug.Conn
  import Phoenix.Controller

  def init(opts), do: opts

  def call(conn, _opts) do
    user_id = get_session(conn, :user_id)

    cond do
      user_id && (user = MyApp.Accounts.get_user(user_id)) ->
        assign(conn, :current_user, user)

      true ->
        conn
        |> put_flash(:error, "You must be logged in")
        |> redirect(to: ~p"/login")
        |> halt()
    end
  end
end

#

How It Works: put_session/3 stores user ID in encrypted cookie. configure_session(renew: true) prevents session fixation. Bcrypt hashes passwords with salt. Plug checks session on each request. halt/1 stops pipeline on auth failure.

Use Cases: User login, protected routes, session management, authentication flows.

See Also: Authentication Guide, Phoenix REST API Guide, Intermediate Tutorial - Phoenix


Recipe 29: Real-Time Updates with PubSub

Problem: Push updates to multiple connected clients.

Solution:

defmodule MyAppWeb.DashboardLive do
  use MyAppWeb, :live_view

  @topic "dashboard:updates"

  def mount(_params, _session, socket) do
    if connected?(socket) do
      Phoenix.PubSub.subscribe(MyApp.PubSub, @topic)
    end

    {:ok,
     socket
     |> assign(:metrics, load_metrics())
     |> assign(:last_update, DateTime.utc_now())}
  end

  # Handle broadcasts from other processes
  def handle_info({:metric_updated, metric}, socket) do
    metrics = update_metric(socket.assigns.metrics, metric)

    {:noreply,
     socket
     |> assign(:metrics, metrics)
     |> assign(:last_update, DateTime.utc_now())}
  end

  def handle_info(:refresh, socket) do
    {:noreply, assign(socket, :metrics, load_metrics())}
  end

  # Broadcast update to all connected clients
  def broadcast_update(metric) do
    Phoenix.PubSub.broadcast(MyApp.PubSub, @topic, {:metric_updated, metric})
  end

  defp load_metrics do
    %{
      users_online: 0,
      requests_per_min: 0,
      avg_response_time: 0
    }
  end

  defp update_metric(metrics, {key, value}) do
    Map.put(metrics, key, value)
  end

  def render(assigns) do
    ~H"""
    <div class="p-8">
      <h1 class="text-3xl font-bold mb-6">Dashboard</h1>

      <div class="grid grid-cols-3 gap-4 mb-4">
        <div class="bg-blue-100 p-4 rounded">
          <h2 class="text-xl">Users Online</h2>
          <p class="text-3xl font-bold"><%= @metrics.users_online %></p>
        </div>

        <div class="bg-green-100 p-4 rounded">
          <h2 class="text-xl">Requests/min</h2>
          <p class="text-3xl font-bold"><%= @metrics.requests_per_min %></p>
        </div>

        <div class="bg-yellow-100 p-4 rounded">
          <h2 class="text-xl">Avg Response (ms)</h2>
          <p class="text-3xl font-bold"><%= @metrics.avg_response_time %></p>
        </div>
      </div>

      <p class="text-sm text-gray-600">
        Last updated: <%= Calendar.strftime(@last_update, "%H:%M:%S") %>
      </p>
    </div>
    """
  end
end

defmodule MyApp.MetricsCollector do
  use GenServer

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def init(state) do
    schedule_collection()
    {:ok, state}
  end

  def handle_info(:collect, state) do
    # Collect metrics
    users_online = :rand.uniform(100)
    requests_per_min = :rand.uniform(1000)
    avg_response_time = :rand.uniform(200)

    # Broadcast to all LiveView clients
    MyAppWeb.DashboardLive.broadcast_update({:users_online, users_online})
    MyAppWeb.DashboardLive.broadcast_update({:requests_per_min, requests_per_min})
    MyAppWeb.DashboardLive.broadcast_update({:avg_response_time, avg_response_time})

    schedule_collection()
    {:noreply, state}
  end

  defp schedule_collection do
    Process.send_after(self(), :collect, 5000)  # Every 5 seconds
  end
end

How It Works: PubSub broadcasts messages to topic. LiveView subscribes on mount. handle_info receives broadcasts and updates assigns.

Use Cases: Dashboards, chat apps, notifications, collaborative editing, live feeds.

See Also: LiveView Guide, Intermediate Tutorial - LiveView


Ecto Database

Recipe 30: Complex Queries with Joins

Problem: Query across multiple tables with associations.

Solution:

defmodule MyApp.Queries.UserQueries do
  import Ecto.Query
  alias MyApp.Accounts.User
  alias MyApp.Blog.Post
  alias MyApp.Blog.Comment

  # Join with associations
  def users_with_posts do
    from u in User,
      join: p in assoc(u, :posts),
      preload: [posts: p],
      where: p.published == true
  end

  # Left join (include users without posts)
  def all_users_with_post_count do
    from u in User,
      left_join: p in assoc(u, :posts),
      group_by: u.id,
      select: {u, count(p.id)}
  end

  # Multiple joins
  def posts_with_author_and_comments do
    from p in Post,
      join: u in assoc(p, :user),
      left_join: c in assoc(p, :comments),
      preload: [user: u, comments: c],
      order_by: [desc: p.inserted_at]
  end

  # Filtered joins
  def users_with_recent_posts(days) do
    cutoff = DateTime.add(DateTime.utc_now(), -days * 24 * 3600, :second)

    from u in User,
      join: p in assoc(u, :posts),
      where: p.inserted_at >= ^cutoff,
      distinct: true,
      preload: [posts: p]
  end

  # Aggregations across joins
  def user_post_statistics do
    from u in User,
      left_join: p in assoc(u, :posts),
      left_join: c in assoc(p, :comments),
      group_by: u.id,
      select: %{
        user_id: u.id,
        name: u.name,
        post_count: count(p.id, :distinct),
        comment_count: count(c.id)
      }
  end
end

import Ecto.Query
alias MyApp.Repo

users = MyApp.Queries.UserQueries.users_with_posts() |> Repo.all()

results = MyApp.Queries.UserQueries.all_users_with_post_count() |> Repo.all()

posts = MyApp.Queries.UserQueries.posts_with_author_and_comments() |> Repo.all()

active_users = MyApp.Queries.UserQueries.users_with_recent_posts(7) |> Repo.all()

stats = MyApp.Queries.UserQueries.user_post_statistics() |> Repo.all()

How It Works: join combines tables. assoc uses schema associations. preload loads related data. group_by and aggregate functions for statistics.

Use Cases: Reports, analytics, complex data retrieval, dashboards.

See Also: Ecto Guide, Intermediate Tutorial - Ecto


Recipe 31: Transactions and Rollbacks

Problem: Execute multiple database operations atomically.

Solution:

defmodule MyApp.Orders do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Orders.Order
  alias MyApp.Inventory.Product
  alias MyApp.Billing.Payment

  # Multi-step transaction
  def create_order(user, cart_items) do
    Repo.transaction(fn ->
      # Step 1: Create order
      order_params = %{user_id: user.id, status: :pending}
      {:ok, order} = create_order_record(order_params)

      # Step 2: Reserve inventory
      case reserve_inventory(cart_items) do
        :ok -> :ok
        {:error, reason} -> Repo.rollback(reason)
      end

      # Step 3: Process payment
      total = calculate_total(cart_items)
      case process_payment(user, total) do
        {:ok, payment} ->
          update_order_payment(order, payment)

        {:error, reason} ->
          Repo.rollback({:payment_failed, reason})
      end

      # Step 4: Create order items
      create_order_items(order, cart_items)

      # Return completed order
      order |> Repo.preload([:items, :payment])
    end)
  end

  # Using Ecto.Multi for explicit transaction steps
  def create_order_with_multi(user, cart_items) do
    Ecto.Multi.new()
    |> Ecto.Multi.insert(:order, order_changeset(user))
    |> Ecto.Multi.run(:inventory, fn _repo, %{order: order} ->
      reserve_inventory(cart_items)
    end)
    |> Ecto.Multi.run(:payment, fn _repo, %{order: order} ->
      total = calculate_total(cart_items)
      process_payment(user, total)
    end)
    |> Ecto.Multi.run(:order_items, fn _repo, %{order: order} ->
      create_order_items(order, cart_items)
    end)
    |> Ecto.Multi.update(:complete_order, fn %{order: order, payment: payment} ->
      Ecto.Changeset.change(order, status: :completed, payment_id: payment.id)
    end)
    |> Repo.transaction()
    |> case do
      {:ok, %{complete_order: order}} ->
        {:ok, order}

      {:error, :inventory, reason, _changes} ->
        {:error, {:inventory_failed, reason}}

      {:error, :payment, reason, _changes} ->
        {:error, {:payment_failed, reason}}

      {:error, step, reason, _changes} ->
        {:error, {step, reason}}
    end
  end

  defp order_changeset(user) do
    %Order{}
    |> Ecto.Changeset.cast(%{user_id: user.id, status: :pending}, [:user_id, :status])
    |> Ecto.Changeset.validate_required([:user_id, :status])
  end

  defp create_order_record(params) do
    %Order{}
    |> Order.changeset(params)
    |> Repo.insert()
  end

  defp reserve_inventory(cart_items) do
    # Check and update inventory
    :ok
  end

  defp process_payment(user, total) do
    # Process payment
    {:ok, %Payment{amount: total}}
  end

  defp calculate_total(cart_items) do
    Enum.reduce(cart_items, 0, fn item, acc ->
      acc + item.price * item.quantity
    end)
  end

  defp create_order_items(order, cart_items) do
    {:ok, []}
  end

  defp update_order_payment(order, payment) do
    order
    |> Ecto.Changeset.change(payment_id: payment.id, status: :completed)
    |> Repo.update()
  end
end

user = Repo.get!(User, 1)
cart_items = [
  %{product_id: 1, price: 10.00, quantity: 2},
  %{product_id: 2, price: 15.00, quantity: 1}
]

case MyApp.Orders.create_order_with_multi(user, cart_items) do
  {:ok, order} ->
    IO.puts("Order created: #{order.id}")

  {:error, {:payment_failed, reason}} ->
    IO.puts("Payment failed: #{inspect(reason)}")

  {:error, reason} ->
    IO.puts("Order failed: #{inspect(reason)}")
end

How It Works: Repo.transaction/1 wraps operations. Repo.rollback/1 aborts transaction. Ecto.Multi builds transaction pipeline with named steps and error handling.

Use Cases: Orders, payments, multi-step workflows, data consistency.

See Also: Ecto Guide, Intermediate Tutorial - Ecto


Testing

Recipe 32: Test GenServer Behavior

Problem: Test stateful GenServer operations.

Solution:

defmodule MyApp.CacheTest do
  use ExUnit.Case, async: true
  alias MyApp.Cache

  setup do
    # Start fresh cache for each test
    {:ok, pid} = Cache.start_link([])
    %{cache: pid}
  end

  test "stores and retrieves values", %{cache: cache} do
    assert :ok = Cache.put(cache, :key1, "value1")
    assert "value1" = Cache.get(cache, :key1)
  end

  test "returns nil for missing keys", %{cache: cache} do
    assert nil == Cache.get(cache, :nonexistent)
  end

  test "overwrites existing keys", %{cache: cache} do
    Cache.put(cache, :key1, "value1")
    Cache.put(cache, :key1, "value2")
    assert "value2" = Cache.get(cache, :key1)
  end

  test "handles concurrent operations", %{cache: cache} do
    # Spawn multiple tasks writing to cache
    tasks = for i <- 1..100 do
      Task.async(fn ->
        Cache.put(cache, "key_#{i}", "value_#{i}")
      end)
    end

    # Wait for all writes
    Task.await_many(tasks)

    # Verify all writes succeeded
    for i <- 1..100 do
      assert "value_#{i}" = Cache.get(cache, "key_#{i}")
    end
  end

  test "cache survives crashes with supervision" do
    # This requires supervisor setup
    # See supervision tree tests
  end
end

defmodule MyApp.AsyncWorkerTest do
  use ExUnit.Case, async: true
  alias MyApp.AsyncWorker

  test "completes async task" do
    {:ok, pid} = AsyncWorker.start_link([])

    # Start async operation
    :ok = AsyncWorker.process_async(pid, "data")

    # Wait for completion message
    assert_receive {:completed, "data"}, 1000
  end

  test "handles task timeout" do
    {:ok, pid} = AsyncWorker.start_link(timeout: 100)

    # Start slow operation
    :ok = AsyncWorker.process_async(pid, fn ->
      :timer.sleep(200)
      "result"
    end)

    # Should receive timeout
    assert_receive {:timeout, _}, 500
  end
end

How It Works: setup creates isolated GenServer per test. async: true runs tests concurrently. assert_receive waits for messages.

Use Cases: GenServer testing, concurrent behavior, async operations.

See Also: Testing Guide, Beginner Tutorial - Testing


Recipe 33: Mock External Services

Problem: Test code that depends on external APIs without hitting them.

Solution:

defmodule MyApp.EmailService do
  @callback send_email(to :: String.t(), subject :: String.t(), body :: String.t()) ::
              {:ok, term()} | {:error, term()}
end

defmodule MyApp.EmailService.SMTP do
  @behaviour MyApp.EmailService

  @impl true
  def send_email(to, subject, body) do
    # Actual SMTP logic
    {:ok, "Email sent to #{to}"}
  end
end

defmodule MyApp.EmailService.Mock do
  @behaviour MyApp.EmailService

  @impl true
  def send_email(to, subject, body) do
    # Store for verification in tests
    send(self(), {:email_sent, to, subject, body})
    {:ok, "Mocked email"}
  end
end

defmodule MyApp.UserNotifier do
  def notify_welcome(user) do
    email_service().send_email(
      user.email,
      "Welcome!",
      "Welcome to our app, #{user.name}!"
    )
  end

  defp email_service do
    Application.get_env(:my_app, :email_service, MyApp.EmailService.SMTP)
  end
end

defmodule MyApp.UserNotifierTest do
  use ExUnit.Case

  setup do
    # Configure mock
    Application.put_env(:my_app, :email_service, MyApp.EmailService.Mock)

    on_exit(fn ->
      Application.delete_env(:my_app, :email_service)
    end)
  end

  test "sends welcome email" do
    user = %{email: "test@example.com", name: "Alice"}

    {:ok, _} = MyApp.UserNotifier.notify_welcome(user)

    assert_received {:email_sent, "test@example.com", "Welcome!", body}
    assert body =~ "Alice"
  end
end

defmodule MyApp.EmailServiceTest do
  use ExUnit.Case, async: true
  import Mox

  # Define mock module
  Mox.defmock(MyApp.EmailService.MockMox, for: MyApp.EmailService)

  setup :verify_on_exit!

  test "sends email via mox" do
    # Set expectations
    expect(MyApp.EmailService.MockMox, :send_email, fn to, subject, body ->
      assert to == "user@example.com"
      assert subject == "Test"
      {:ok, "sent"}
    end)

    # Call code under test
    MyApp.EmailService.MockMox.send_email("user@example.com", "Test", "Body")
  end
end

How It Works: Define @behaviour for interface. Implement real and mock versions. Configure via Application env. Mox provides stricter mocking with expectations.

Use Cases: External API testing, service integration, CI/CD testing.

See Also: Testing Guide


Performance

Recipe 34: Profile and Benchmark Code

Problem: Identify performance bottlenecks and measure improvements.

Solution:

defmodule MyApp.Performance do
  # Benchmarking with :timer.tc
  def benchmark(name, fun) do
    {time_us, result} = :timer.tc(fun)
    IO.puts("#{name}: #{time_us / 1_000} ms")
    result
  end

  # Benchmarking with Benchee
  def compare_implementations do
    Benchee.run(
      %{
        "Enum.map" => fn input -> Enum.map(input, &(&1 * 2)) end,
        "for comprehension" => fn input -> for x <- input, do: x * 2 end,
        "Stream.map" => fn input -> input |> Stream.map(&(&1 * 2)) |> Enum.to_list() end
      },
      inputs: %{
        "Small (100)" => Enum.to_list(1..100),
        "Medium (10_000)" => Enum.to_list(1..10_000),
        "Large (1_000_000)" => Enum.to_list(1..1_000_000)
      },
      time: 5,
      memory_time: 2
    )
  end

  # Profiling with :fprof
  def profile_function(fun) do
    :fprof.trace([:start, {:procs, self()}])
    result = fun.()
    :fprof.trace(:stop)

    :fprof.profile()
    :fprof.analyse([:totals, {:sort, :acc}, {:dest, 'fprof_analysis.txt'}])

    IO.puts("Profile saved to fprof_analysis.txt")
    result
  end

  # Memory profiling
  def measure_memory(fun) do
    :erlang.garbage_collect()
    memory_before = :erlang.memory(:total)

    result = fun.()

    :erlang.garbage_collect()
    memory_after = :erlang.memory(:total)
    memory_used = memory_after - memory_before

    IO.puts("Memory used: #{memory_used / 1024 / 1024} MB")
    result
  end
end

MyApp.Performance.benchmark("List processing", fn ->
  Enum.map(1..10_000, &(&1 * 2))
end)

MyApp.Performance.compare_implementations()

MyApp.Performance.profile_function(fn ->
  # Your expensive operation
  expensive_operation()
end)

MyApp.Performance.measure_memory(fn ->
  # Operation that allocates memory
  large_list = Enum.map(1..1_000_000, &(&1 * 2))
end)

defmodule MyApp.Expensive do
  import ExProf.Macro

  def run do
    profile do
      expensive_operation()
    end
  end

  defp expensive_operation do
    1..10_000
    |> Enum.map(&heavy_computation/1)
    |> Enum.sum()
  end

  defp heavy_computation(n) do
    :timer.sleep(1)
    n * n
  end
end

How It Works: :timer.tc/1 measures execution time. Benchee provides statistical analysis. :fprof profiles function calls. Memory tracking via :erlang.memory/1.

Use Cases: Performance optimization, bottleneck identification, regression testing.

See Also: Performance Guide, Advanced Tutorial - Performance


Recipe 35: Optimize Database Queries

Problem: Speed up slow Ecto queries.

Solution:

defmodule MyApp.OptimizedQueries do
  import Ecto.Query
  alias MyApp.Repo

  # Bad: N+1 query problem
  def users_with_posts_slow do
    users = Repo.all(User)

    Enum.map(users, fn user ->
      posts = Repo.all(from p in Post, where: p.user_id == ^user.id)
      {user, posts}
    end)
  end

  # Good: Preload associations
  def users_with_posts_fast do
    User
    |> Repo.all()
    |> Repo.preload(:posts)
  end

  # Good: Join and preload in single query
  def users_with_posts_optimized do
    from(u in User,
      join: p in assoc(u, :posts),
      preload: [posts: p]
    )
    |> Repo.all()
  end

  # Use select to fetch only needed fields
  def user_names_and_emails do
    from(u in User,
      select: %{name: u.name, email: u.email}
    )
    |> Repo.all()
  end

  # Pagination with limit/offset
  def paginated_users(page, per_page) do
    offset = (page - 1) * per_page

    from(u in User,
      limit: ^per_page,
      offset: ^offset,
      order_by: [desc: u.inserted_at]
    )
    |> Repo.all()
  end

  # Use indexes (in migration)
  def create_indexes do
    """
    defmodule MyApp.Repo.Migrations.AddIndexes do
      use Ecto.Migration

      def change do
        # Index foreign keys
        create index(:posts, [:user_id])
        create index(:comments, [:post_id])

        # Index frequently queried fields
        create index(:users, [:email])
        create index(:posts, [:published_at])

        # Composite index for common query patterns
        create index(:posts, [:user_id, :published_at])

        # Unique index
        create unique_index(:users, [:email])
      end
    end
    """
  end

  # Batch processing to avoid memory issues
  def process_all_users_in_batches(batch_size \\ 1000) do
    stream = Repo.stream(User)

    Repo.transaction(fn ->
      stream
      |> Stream.chunk_every(batch_size)
      |> Enum.each(fn batch ->
        # Process batch
        process_batch(batch)
      end)
    end)
  end

  defp process_batch(users) do
    # Process users in batch
    IO.puts("Processing #{length(users)} users")
  end
end

How It Works: Preloading avoids N+1 queries. Selective fields reduce data transfer. Indexes speed up lookups. Streaming processes large datasets without loading all into memory.

Use Cases: Database optimization, query performance, large dataset processing.

See Also: Ecto Guide, Performance Guide


Configuration

Recipe 36: Environment-Based Configuration

Problem: Manage configuration across environments (dev, test, prod).

Solution:

import Config

config :my_app,
  ecto_repos: [MyApp.Repo]

config :my_app, MyApp.Repo,
  database: "my_app_dev",
  hostname: "localhost",
  pool_size: 10

import_config "#{config_env()}.exs"

import Config

config :my_app, MyApp.Repo,
  database: "my_app_dev",
  hostname: "localhost",
  show_sensitive_data_on_connection_error: true,
  pool_size: 10

config :my_app, MyAppWeb.Endpoint,
  http: [port: 4000],
  debug_errors: true,
  code_reloader: true

import Config

config :my_app, MyApp.Repo,
  database: "my_app_test#{System.get_env("MIX_TEST_PARTITION")}",
  hostname: "localhost",
  pool: Ecto.Adapters.SQL.Sandbox,
  pool_size: 10

config :my_app, MyAppWeb.Endpoint,
  http: [port: 4002],
  server: false

import Config

import Config

if config_env() == :prod do
  database_url =
    System.get_env("DATABASE_URL") ||
      raise """
      environment variable DATABASE_URL is missing.
      For example: ecto://USER:PASS@HOST/DATABASE
      """

  config :my_app, MyApp.Repo,
    url: database_url,
    pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10"),
    ssl: true

  secret_key_base =
    System.get_env("SECRET_KEY_BASE") ||
      raise """
      environment variable SECRET_KEY_BASE is missing.
      """

  config :my_app, MyAppWeb.Endpoint,
    http: [port: String.to_integer(System.get_env("PORT") || "4000")],
    secret_key_base: secret_key_base,
    server: true
end

defmodule MyApp.Config do
  def database_url do
    Application.get_env(:my_app, MyApp.Repo)[:url]
  end

  def pool_size do
    Application.get_env(:my_app, MyApp.Repo)[:pool_size]
  end

  def feature_enabled?(feature) do
    Application.get_env(:my_app, :features, %{})
    |> Map.get(feature, false)
  end
end

MyApp.Config.database_url()

MyApp.Config.pool_size()

MyApp.Config.feature_enabled?(:new_dashboard)

How It Works: config.exs loads base config. Environment-specific files override. runtime.exs loads after compilation (for env vars). Application.get_env/3 retrieves config.

Use Cases: Multi-environment deployment, secret management, feature flags.

See Also: Configuration Guide, Intermediate Tutorial - Configuration


Debugging

Recipe 37: Debug Techniques and Tools

Problem: Identify and fix bugs efficiently.

Solution:

defmodule MyApp.Debugging do
  require Logger

  # 1. IO.inspect with labels
  def inspect_pipeline(data) do
    data
    |> IO.inspect(label: "Input")
    |> transform_step1()
    |> IO.inspect(label: "After step 1")
    |> transform_step2()
    |> IO.inspect(label: "After step 2")
  end

  # 2. IEx.pry for breakpoints (development only)
  def debug_with_pry(data) do
    require IEx

    transformed = transform_data(data)
    IEx.pry()  # Execution pauses here, opens IEx shell

    process_result(transformed)
  end

  # 3. Logger for production debugging
  def log_operation(user, action) do
    Logger.info("User #{user.id} performing #{action}")

    case perform_action(user, action) do
      {:ok, result} ->
        Logger.info("Action #{action} succeeded",
          user_id: user.id,
          result: inspect(result))
        {:ok, result}

      {:error, reason} ->
        Logger.error("Action #{action} failed",
          user_id: user.id,
          reason: inspect(reason))
        {:error, reason}
    end
  end

  # 4. Using dbg for Elixir 1.14+
  def debug_with_dbg(data) do
    data
    |> transform_step1()
    |> dbg()  # Shows transformation result
    |> transform_step2()
  end

  # 5. Pattern matching for debugging
  def debug_pattern_match(result) do
    case result do
      {:ok, value} = success ->
        IO.inspect(success, label: "Success case")
        value

      {:error, reason} = error ->
        IO.inspect(error, label: "Error case")
        raise "Operation failed: #{inspect(reason)}"
    end
  end

  # 6. Observer for runtime inspection
  def start_observer do
    :observer.start()
  end

  # 7. Recon for production debugging
  def check_memory_usage do
    :recon.proc_count(:memory, 10)
    |> IO.inspect(label: "Top 10 processes by memory")
  end

  def check_message_queue do
    :recon.proc_count(:message_queue_len, 10)
    |> IO.inspect(label: "Top 10 processes by message queue")
  end

  defp transform_step1(data), do: data
  defp transform_step2(data), do: data
  defp transform_data(data), do: data
  defp process_result(data), do: data
  defp perform_action(_user, _action), do: {:ok, "result"}
end

# # Execution pauses, you can inspect variables:

How It Works: IO.inspect/2 shows values without breaking flow. IEx.pry/0 pauses execution for inspection. Logger records events. dbg/1 traces pipe operations. Observer visualizes runtime state.

Use Cases: Development debugging, production troubleshooting, performance analysis.

See Also: Debugging Guide, Beginner Tutorial - Debugging


Recipe 38: Background Job Processing with GenServer

Problem: Process long-running tasks asynchronously without blocking requests.

Solution:

defmodule MyApp.JobQueue do
  use GenServer
  require Logger

  # Client API
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def enqueue(job_type, params) do
    job = %{
      id: generate_job_id(),
      type: job_type,
      params: params,
      enqueued_at: DateTime.utc_now(),
      status: :pending
    }

    GenServer.cast(__MODULE__, {:enqueue, job})
    {:ok, job.id}
  end

  def get_status(job_id) do
    GenServer.call(__MODULE__, {:get_status, job_id})
  end

  # Server Callbacks
  @impl true
  def init(_opts) do
    state = %{
      queue: :queue.new(),
      processing: %{},
      completed: %{},
      max_concurrent: 5
    }

    schedule_process_jobs()
    {:ok, state}
  end

  @impl true
  def handle_cast({:enqueue, job}, state) do
    new_queue = :queue.in(job, state.queue)
    {:noreply, %{state | queue: new_queue}}
  end

  @impl true
  def handle_call({:get_status, job_id}, _from, state) do
    status = cond do
      Map.has_key?(state.processing, job_id) -> {:processing, state.processing[job_id]}
      Map.has_key?(state.completed, job_id) -> {:completed, state.completed[job_id]}
      true -> {:not_found, nil}
    end

    {:reply, status, state}
  end

  @impl true
  def handle_info(:process_jobs, state) do
    state = process_pending_jobs(state)
    schedule_process_jobs()
    {:noreply, state}
  end

  @impl true
  def handle_info({:job_completed, job_id, result}, state) do
    Logger.info("Job #{job_id} completed: #{inspect(result)}")

    completed_job = Map.get(state.processing, job_id)
    |> Map.put(:result, result)
    |> Map.put(:completed_at, DateTime.utc_now())
    |> Map.put(:status, :completed)

    new_state = %{state |
      processing: Map.delete(state.processing, job_id),
      completed: Map.put(state.completed, job_id, completed_job)
    }

    {:noreply, new_state}
  end

  @impl true
  def handle_info({:job_failed, job_id, error}, state) do
    Logger.error("Job #{job_id} failed: #{inspect(error)}")

    failed_job = Map.get(state.processing, job_id)
    |> Map.put(:error, error)
    |> Map.put(:failed_at, DateTime.utc_now())
    |> Map.put(:status, :failed)

    new_state = %{state |
      processing: Map.delete(state.processing, job_id),
      completed: Map.put(state.completed, job_id, failed_job)
    }

    {:noreply, new_state}
  end

  # Private Functions
  defp process_pending_jobs(state) do
    concurrent_count = map_size(state.processing)
    available_slots = state.max_concurrent - concurrent_count

    if available_slots > 0 && !:queue.is_empty(state.queue) do
      {jobs_to_process, remaining_queue} = dequeue_n(state.queue, available_slots)

      processing = Enum.reduce(jobs_to_process, state.processing, fn job, acc ->
        spawn_job_worker(job)
        Map.put(acc, job.id, Map.put(job, :status, :processing))
      end)

      %{state | queue: remaining_queue, processing: processing}
    else
      state
    end
  end

  defp dequeue_n(queue, n) do
    dequeue_n(queue, n, [])
  end

  defp dequeue_n(queue, 0, acc) do
    {Enum.reverse(acc), queue}
  end

  defp dequeue_n(queue, n, acc) do
    case :queue.out(queue) do
      {{:value, item}, new_queue} ->
        dequeue_n(new_queue, n - 1, [item | acc])

      {:empty, queue} ->
        {Enum.reverse(acc), queue}
    end
  end

  defp spawn_job_worker(job) do
    parent = self()

    Task.start(fn ->
      try do
        result = execute_job(job)
        send(parent, {:job_completed, job.id, result})
      rescue
        error ->
          send(parent, {:job_failed, job.id, error})
      end
    end)
  end

  defp execute_job(%{type: :send_email, params: params}) do
    # Simulate email sending
    Process.sleep(1000)
    {:ok, "Email sent to #{params[:to]}"}
  end

  defp execute_job(%{type: :process_upload, params: params}) do
    # Simulate file processing
    Process.sleep(2000)
    {:ok, "Processed file #{params[:filename]}"}
  end

  defp execute_job(%{type: :generate_report, params: params}) do
    # Simulate report generation
    Process.sleep(3000)
    {:ok, "Generated report #{params[:report_id]}"}
  end

  defp execute_job(%{type: type}) do
    {:error, "Unknown job type: #{type}"}
  end

  defp schedule_process_jobs do
    Process.send_after(self(), :process_jobs, 1000)
  end

  defp generate_job_id do
    :crypto.strong_rand_bytes(16) |> Base.encode16()
  end
end

MyApp.JobQueue.start_link()

{:ok, job_id} = MyApp.JobQueue.enqueue(:send_email, %{to: "user@example.com", subject: "Welcome"})

MyApp.JobQueue.get_status(job_id)

MyApp.JobQueue.get_status(job_id)

How It Works: GenServer maintains queue of pending jobs. :process_jobs message triggers batch processing. max_concurrent limits parallel jobs. Task spawns worker process for each job. Worker sends completion/failure message back to GenServer.

Use Cases: Email sending, file processing, report generation, data exports, webhook delivery.

See Also: GenServer Guide, Task Guide, Intermediate Tutorial - OTP


Recipe 39: Database Migrations with Ecto

Problem: Manage schema changes and data migrations safely.

Solution:


defmodule MyApp.Repo.Migrations.CreateUsersTable do
  use Ecto.Migration

  def change do
    create table(:users) do
      add :email, :string, null: false
      add :name, :string
      add :age, :integer
      add :role, :string, default: "user"
      add :active, :boolean, default: true
      add :metadata, :map, default: %{}

      timestamps()
    end

    create unique_index(:users, [:email])
    create index(:users, [:role])
    create index(:users, [:active])
  end
end


defmodule MyApp.Repo.Migrations.AddConfirmedAtToUsers do
  use Ecto.Migration

  def change do
    alter table(:users) do
      add :confirmed_at, :naive_datetime
    end

    create index(:users, [:confirmed_at])
  end
end


defmodule MyApp.Repo.Migrations.MigrateUserRoles do
  use Ecto.Migration
  import Ecto.Query

  def up do
    # Rename role values using raw SQL
    execute """
    UPDATE users
    SET role = CASE
      WHEN role = 'admin' THEN 'administrator'
      WHEN role = 'mod' THEN 'moderator'
      ELSE role
    END
    """

    # Or using Ecto query (safer for complex logic)
    repo().update_all(
      from(u in "users", where: u.role == "admin"),
      set: [role: "administrator"]
    )
  end

  def down do
    repo().update_all(
      from(u in "users", where: u.role == "administrator"),
      set: [role: "admin"]
    )
  end
end

defmodule MyApp.Repo.Migrations.RefactorUserProfiles do
  use Ecto.Migration

  def up do
    # Create new table
    create table(:user_profiles) do
      add :user_id, references(:users, on_delete: :delete_all), null: false
      add :bio, :text
      add :avatar_url, :string
      add :preferences, :map, default: %{}

      timestamps()
    end

    create unique_index(:user_profiles, [:user_id])

    # Migrate data from users to user_profiles
    execute """
    INSERT INTO user_profiles (user_id, bio, inserted_at, updated_at)
    SELECT id, bio, inserted_at, updated_at
    FROM users
    WHERE bio IS NOT NULL
    """

    # Remove column from users table
    alter table(:users) do
      remove :bio
    end
  end

  def down do
    # Add column back
    alter table(:users) do
      add :bio, :text
    end

    # Migrate data back
    execute """
    UPDATE users u
    SET bio = up.bio
    FROM user_profiles up
    WHERE u.id = up.user_id
    """

    # Drop table
    drop table(:user_profiles)
  end
end

How It Works: change/0 defines reversible migrations. up/0 and down/0 for custom logic. execute/1 runs raw SQL. repo().update_all/2 for data migrations. Migrations run in transaction by default (except for some DDL operations).

Use Cases: Schema evolution, data migrations, index management, constraint additions, table refactoring.

See Also: Ecto Guide, Intermediate Tutorial - Ecto


Recipe 40: API Versioning Strategies

Problem: Support multiple API versions simultaneously.

Solution:

defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
  end

  # Version 1
  scope "/api/v1", MyAppWeb.V1, as: :v1 do
    pipe_through :api

    resources "/users", UserController, except: [:new, :edit]
    resources "/posts", PostController, except: [:new, :edit]
  end

  # Version 2
  scope "/api/v2", MyAppWeb.V2, as: :v2 do
    pipe_through :api

    resources "/users", UserController, except: [:new, :edit]
    resources "/posts", PostController, except: [:new, :edit]
  end
end

defmodule MyAppWeb.V1.UserController do
  use MyAppWeb, :controller

  def index(conn, _params) do
    users = MyApp.Accounts.list_users()
    render(conn, "index.json", users: users)
  end

  def show(conn, %{"id" => id}) do
    user = MyApp.Accounts.get_user!(id)
    render(conn, "show.json", user: user)
  end
end

defmodule MyAppWeb.V1.UserView do
  use MyAppWeb, :view

  def render("index.json", %{users: users}) do
    %{data: Enum.map(users, &user_json/1)}
  end

  def render("show.json", %{user: user}) do
    %{data: user_json(user)}
  end

  defp user_json(user) do
    %{
      id: user.id,
      email: user.email,
      name: user.name
    }
  end
end

defmodule MyAppWeb.V2.UserController do
  use MyAppWeb, :controller

  def index(conn, params) do
    page = Map.get(params, "page", 1)
    per_page = Map.get(params, "per_page", 20)

    users = MyApp.Accounts.list_users_paginated(page, per_page)
    total = MyApp.Accounts.count_users()

    render(conn, "index.json", users: users, total: total, page: page, per_page: per_page)
  end
end

defmodule MyAppWeb.V2.UserView do
  use MyAppWeb, :view

  def render("index.json", %{users: users, total: total, page: page, per_page: per_page}) do
    %{
      data: Enum.map(users, &user_json/1),
      pagination: %{
        page: page,
        per_page: per_page,
        total: total,
        total_pages: ceil(total / per_page)
      }
    }
  end

  defp user_json(user) do
    %{
      id: user.id,
      email: user.email,
      name: user.name,
      created_at: user.inserted_at,  # New field in v2
      profile_url: "/users/#{user.id}/profile"  # HATEOAS link
    }
  end
end

defmodule MyAppWeb.Plugs.APIVersion do
  import Plug.Conn

  def init(opts), do: opts

  def call(conn, _opts) do
    version = get_req_header(conn, "accept")
    |> List.first()
    |> parse_version()

    assign(conn, :api_version, version)
  end

  defp parse_version("application/vnd.myapp.v1+json"), do: :v1
  defp parse_version("application/vnd.myapp.v2+json"), do: :v2
  defp parse_version(_), do: :v2  # Default to latest
end

defmodule MyAppWeb.UserController do
  use MyAppWeb, :controller

  def index(conn, params) do
    case conn.assigns.api_version do
      :v1 -> index_v1(conn, params)
      :v2 -> index_v2(conn, params)
    end
  end

  defp index_v1(conn, _params) do
    users = MyApp.Accounts.list_users()
    render(conn, "index_v1.json", users: users)
  end

  defp index_v2(conn, params) do
    page = Map.get(params, "page", 1)
    users = MyApp.Accounts.list_users_paginated(page, 20)
    render(conn, "index_v2.json", users: users, page: page)
  end
end

defmodule MyAppWeb.Shared.UserController do
  # Common functionality
  def get_user(id) do
    MyApp.Accounts.get_user!(id)
  end
end

defmodule MyAppWeb.V1.UserController do
  use MyAppWeb, :controller
  import MyAppWeb.Shared.UserController

  def show(conn, %{"id" => id}) do
    user = get_user(id)  # Shared function
    render(conn, "show.json", user: user)
  end
end

defmodule MyAppWeb.V2.UserController do
  use MyAppWeb, :controller
  import MyAppWeb.Shared.UserController

  def show(conn, %{"id" => id}) do
    user = get_user(id)  # Same shared function
    user = MyApp.Accounts.preload_associations(user, :v2)  # Version-specific enhancement
    render(conn, "show.json", user: user)
  end
end

How It Works: URL versioning uses scoped routes with version namespace. Header versioning checks Accept header. Shared modules reduce duplication. Each version has dedicated controllers/views. Migration guide helps clients upgrade.

Use Cases: Public APIs, mobile app backends, third-party integrations, gradual feature rollouts.

See Also: Phoenix REST API Guide, Intermediate Tutorial - Phoenix


Recipe 41: Rate Limiting Patterns

Problem: Prevent API abuse with request rate limiting.

Solution:

defmodule MyApp.RateLimiter do
  use GenServer

  @table :rate_limiter_buckets
  @refill_interval 1000  # 1 second
  @max_tokens 10  # 10 requests per second

  # Client API
  def start_link(opts \ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def check_rate(identifier, cost \ 1) do
    GenServer.call(__MODULE__, {:check_rate, identifier, cost})
  end

  # Server Callbacks
  @impl true
  def init(_opts) do
    :ets.new(@table, [:named_table, :set, :public, read_concurrency: true])
    schedule_refill()
    {:ok, %{}}
  end

  @impl true
  def handle_call({:check_rate, identifier, cost}, _from, state) do
    now = System.system_time(:millisecond)
    bucket = get_or_create_bucket(identifier, now)

    if bucket.tokens >= cost do
      new_bucket = %{bucket | tokens: bucket.tokens - cost}
      :ets.insert(@table, {identifier, new_bucket})
      {:reply, {:ok, new_bucket.tokens}, state}
    else
      retry_after = calculate_retry_after(bucket)
      {:reply, {:error, :rate_limited, retry_after}, state}
    end
  end

  @impl true
  def handle_info(:refill_tokens, state) do
    now = System.system_time(:millisecond)

    :ets.foldl(fn {identifier, bucket}, _acc ->
      elapsed = now - bucket.last_refill
      tokens_to_add = div(elapsed, @refill_interval) * @max_tokens

      if tokens_to_add > 0 do
        new_tokens = min(bucket.tokens + tokens_to_add, @max_tokens)
        new_bucket = %{bucket | tokens: new_tokens, last_refill: now}
        :ets.insert(@table, {identifier, new_bucket})
      end

      :ok
    end, :ok, @table)

    schedule_refill()
    {:noreply, state}
  end

  defp get_or_create_bucket(identifier, now) do
    case :ets.lookup(@table, identifier) do
      [{^identifier, bucket}] -> bucket
      [] ->
        bucket = %{
          tokens: @max_tokens,
          last_refill: now
        }
        :ets.insert(@table, {identifier, bucket})
        bucket
    end
  end

  defp calculate_retry_after(_bucket) do
    @refill_interval
  end

  defp schedule_refill do
    Process.send_after(self(), :refill_tokens, @refill_interval)
  end
end

defmodule MyAppWeb.Plugs.RateLimit do
  import Plug.Conn
  import Phoenix.Controller

  def init(opts) do
    Keyword.merge([
      max_requests: 100,
      interval_seconds: 60,
      identifier: :ip
    ], opts)
  end

  def call(conn, opts) do
    identifier = get_identifier(conn, opts[:identifier])
    cost = 1

    case MyApp.RateLimiter.check_rate(identifier, cost) do
      {:ok, remaining} ->
        conn
        |> put_resp_header("x-ratelimit-limit", to_string(opts[:max_requests]))
        |> put_resp_header("x-ratelimit-remaining", to_string(remaining))

      {:error, :rate_limited, retry_after} ->
        conn
        |> put_resp_header("retry-after", to_string(retry_after))
        |> put_status(:too_many_requests)
        |> json(%{error: "Rate limit exceeded. Try again later."})
        |> halt()
    end
  end

  defp get_identifier(conn, :ip) do
    conn.remote_ip
    |> :inet.ntoa()
    |> to_string()
  end

  defp get_identifier(conn, :user) do
    conn.assigns[:current_user] && conn.assigns.current_user.id || get_identifier(conn, :ip)
  end
end

defmodule MyApp.SlidingWindowRateLimiter do
  @window_size 60_000  # 60 seconds
  @max_requests 100

  def check_rate(identifier) do
    now = System.system_time(:millisecond)
    window_start = now - @window_size

    # Get requests in current window
    requests = get_requests(identifier, window_start, now)

    if length(requests) < @max_requests do
      record_request(identifier, now)
      {:ok, @max_requests - length(requests) - 1}
    else
      oldest_request = List.first(requests)
      retry_after = oldest_request + @window_size - now
      {:error, :rate_limited, retry_after}
    end
  end

  defp get_requests(identifier, window_start, window_end) do
    # Implementation using ETS or Redis
    # Return list of request timestamps
    []
  end

  defp record_request(identifier, timestamp) do
    # Store request timestamp
    :ok
  end
end

#

How It Works: Token bucket algorithm grants tokens at fixed rate. Each request consumes tokens. ETS stores per-user buckets. Refill process adds tokens periodically. Plug checks rate before controller execution. Returns 429 status when limit exceeded.

Use Cases: API rate limiting, DDoS prevention, resource protection, fair usage enforcement.

See Also: GenServer Guide, Phoenix REST API Guide


Recipe 42: Internationalization (i18n) with Gettext

Problem: Support multiple languages in Phoenix application.

Solution:


config :my_app, MyAppWeb.Gettext,
  default_locale: "en",
  locales: ~w(en id es fr)


defmodule MyAppWeb.Gettext do
  use Gettext, otp_app: :my_app
end

defmodule MyAppWeb.Plugs.SetLocale do
  import Plug.Conn

  @supported_locales Gettext.known_locales(MyAppWeb.Gettext)

  def init(opts), do: opts

  def call(conn, _opts) do
    locale = determine_locale(conn)

    if locale in @supported_locales do
      Gettext.put_locale(MyAppWeb.Gettext, locale)
      assign(conn, :locale, locale)
    else
      conn
    end
  end

  defp determine_locale(conn) do
    # Priority: URL param > Cookie > Accept-Language header > Default
    conn.params["locale"] ||
      get_session(conn, :locale) ||
      parse_accept_language(conn) ||
      Gettext.get_locale(MyAppWeb.Gettext)
  end

  defp parse_accept_language(conn) do
    case get_req_header(conn, "accept-language") do
      [value | _] ->
        value
        |> String.split(",")
        |> Enum.map(&parse_language_option/1)
        |> Enum.sort(&(&1.quality > &2.quality))
        |> Enum.find(&(&1.tag in @supported_locales))
        |> case do
          nil -> nil
          language_option -> language_option.tag
        end

      [] ->
        nil
    end
  end

  defp parse_language_option(string) do
    captures = ~r/^(?<tag>[\w\-]+)(?:;q=(?<quality>[\d\.]+))?$/i
    |> Regex.named_captures(String.trim(string))

    quality = case captures["quality"] do
      "" -> 1.0
      q -> String.to_float(q)
    end

    %{tag: captures["tag"], quality: quality}
  end
end

msgid "Welcome"
msgstr "Welcome"

msgid "Hello %{name}!"
msgstr "Hello %{name}!"

msgid "You have %{count} unread messages"
msgid_plural "You have %{count} unread messages"
msgstr[0] "You have one unread message"
msgstr[1] "You have %{count} unread messages"

msgid "Welcome"
msgstr "Selamat Datang"

msgid "Hello %{name}!"
msgstr "Halo %{name}!"

msgid "You have %{count} unread messages"
msgid_plural "You have %{count} unread messages"
msgstr[0] "Anda memiliki satu pesan belum dibaca"
msgstr[1] "Anda memiliki %{count} pesan belum dibaca"

defmodule MyAppWeb.PageController do
  use MyAppWeb, :controller
  import MyAppWeb.Gettext

  def index(conn, _params) do
    welcome_message = gettext("Welcome")
    greeting = gettext("Hello %{name}!", name: "Alice")

    render(conn, "index.html",
      welcome: welcome_message,
      greeting: greeting
    )
  end
end


msgid "Invalid email"
msgstr "Invalid email address"

msgid "Password too short"
msgstr "Password must be at least 8 characters"

msgid "Invalid email"
msgstr "Alamat email tidak valid"

msgid "Password too short"
msgstr "Kata sandi harus minimal 8 karakter"

defmodule MyApp.Accounts.User do
  import MyAppWeb.Gettext

  def changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :password])
    |> validate_required([:email, :password])
    |> validate_format(:email, ~r/@/, message: dgettext("errors", "Invalid email"))
    |> validate_length(:password, min: 8, message: dgettext("errors", "Password too short"))
  end
end

How It Works: Gettext extracts translation strings from code. .po files store translations. Locale determined from request (params, headers, cookies). gettext/1 and dgettext/2 (domain-specific) fetch translated strings. ngettext/3 handles pluralization.

Use Cases: Multi-language websites, internationalized apps, localized error messages, region-specific content.

See Also: Phoenix Guide, Intermediate Tutorial - Phoenix


Recipe 43: File Uploads with Phoenix

Problem: Handle file uploads securely with validation.

Solution:

defmodule MyAppWeb.UploadController do
  use MyAppWeb, :controller
  require Logger

  @upload_dir "priv/static/uploads"
  @max_file_size 10 * 1024 * 1024  # 10 MB
  @allowed_extensions ~w(.jpg .jpeg .png .gif .pdf .docx)

  def new(conn, _params) do
    render(conn, "new.html")
  end

  def create(conn, %{"upload" => %{"file" => upload}}) do
    case validate_and_save_upload(upload) do
      {:ok, file_path} ->
        conn
        |> put_flash(:info, "File uploaded successfully")
        |> redirect(to: ~p"/uploads")

      {:error, reason} ->
        conn
        |> put_flash(:error, "Upload failed: #{reason}")
        |> render("new.html")
    end
  end

  defp validate_and_save_upload(upload) do
    with :ok <- validate_file_size(upload),
         :ok <- validate_file_type(upload),
         :ok <- validate_content_type(upload),
         {:ok, destination} <- generate_destination(upload),
         :ok <- save_file(upload, destination) do
      {:ok, destination}
    else
      {:error, reason} -> {:error, reason}
    end
  end

  defp validate_file_size(upload) do
    case upload.path |> File.stat!() |> Map.get(:size) do
      size when size <= @max_file_size -> :ok
      size -> {:error, "File too large: #{size} bytes (max: #{@max_file_size})"}
    end
  end

  defp validate_file_type(upload) do
    extension = Path.extname(upload.filename) |> String.downcase()

    if extension in @allowed_extensions do
      :ok
    else
      {:error, "File type not allowed: #{extension}"}
    end
  end

  defp validate_content_type(upload) do
    # Additional validation: check actual file content (magic numbers)
    # This prevents malicious files with fake extensions
    case File.read(upload.path) do
      {:ok, <<0xFF, 0xD8, 0xFF, _::binary>>} -> :ok  # JPEG
      {:ok, <<0x89, 0x50, 0x4E, 0x47, _::binary>>} -> :ok  # PNG
      {:ok, <<0x47, 0x49, 0x46, 0x38, _::binary>>} -> :ok  # GIF
      {:ok, <<0x25, 0x50, 0x44, 0x46, _::binary>>} -> :ok  # PDF
      _ ->
        # For other types, trust content_type from upload
        if upload.content_type in ~w(image/jpeg image/png image/gif application/pdf) do
          :ok
        else
          {:error, "Invalid file content"}
        end
    end
  end

  defp generate_destination(upload) do
    # Generate unique filename to prevent collisions
    timestamp = DateTime.utc_now() |> DateTime.to_unix()
    random = :crypto.strong_rand_bytes(8) |> Base.url_encode64(padding: false)
    extension = Path.extname(upload.filename)

    filename = "#{timestamp}_#{random}#{extension}"
    destination = Path.join(@upload_dir, filename)

    # Ensure upload directory exists
    File.mkdir_p!(@upload_dir)

    {:ok, destination}
  end

  defp save_file(upload, destination) do
    case File.cp(upload.path, destination) do
      :ok -> :ok
      {:error, reason} -> {:error, "Failed to save file: #{inspect(reason)}"}
    end
  end
end

defmodule MyAppWeb.UploadLive do
  use MyAppWeb, :live_view

  @impl true
  def mount(_params, _session, socket) do
    {:ok,
     socket
     |> assign(:uploaded_files, [])
     |> allow_upload(:avatar,
       accept: ~w(.jpg .jpeg .png),
       max_entries: 1,
       max_file_size: 10_000_000,
       auto_upload: true
     )}
  end

  @impl true
  def handle_event("validate", _params, socket) do
    {:noreply, socket}
  end

  @impl true
  def handle_event("save", _params, socket) do
    uploaded_files =
      consume_uploaded_entries(socket, :avatar, fn %{path: path}, entry ->
        dest = Path.join("priv/static/uploads", entry.client_name)
        File.cp!(path, dest)
        {:ok, ~p"/uploads/#{Path.basename(dest)}"}
      end)

    {:noreply,
     socket
     |> update(:uploaded_files, &(&1 ++ uploaded_files))
     |> put_flash(:info, "File uploaded!")}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <div>
      <h1>Upload File</h1>

      <.form for={:upload} phx-change="validate" phx-submit="save">
        <.live_file_input upload={@uploads.avatar} />

        <%= for entry <- @uploads.avatar.entries do %>
          <div>
            <.live_img_preview entry={entry} />
            <p><%= entry.progress %>%</p>

            <%= for err <- upload_errors(@uploads.avatar, entry) do %>
              <p class="error"><%= error_to_string(err) %></p>
            <% end %>
          </div>
        <% end %>

        <.button type="submit">Upload</.button>
      </.form>

      <h2>Uploaded Files</h2>
      <%= for file <- @uploaded_files do %>
        <div>
          <img src={file} width="200" />
        </div>
      <% end %>
    </div>
    """
  end

  defp error_to_string(:too_large), do: "File is too large"
  defp error_to_string(:not_accepted), do: "File type not accepted"
  defp error_to_string(:too_many_files), do: "Too many files"
end

<div>
  <h1>Upload File</h1>

  <%= form_for :upload, ~p"/uploads", [multipart: true], fn f -> %>
    <div>
      <%= file_input f, :file, required: true %>
    </div>

    <div>
      <%= submit "Upload" %>
    </div>
  <% end %>
</div>

defmodule MyApp.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset

  schema "users" do
    field :email, :string
    field :name, :string
    field :avatar_url, :string

    timestamps()
  end

  def changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :name, :avatar_url])
    |> validate_required([:email, :name])
  end

  def avatar_changeset(user, upload_path) do
    change(user, avatar_url: upload_path)
  end
end

How It Works: Phoenix uploads provide Plug.Upload struct. LiveView’s allow_upload/3 handles client-side validation and progress. consume_uploaded_entries/3 processes files. Validation checks size, extension, and content type (magic numbers). Unique filenames prevent collisions.

Use Cases: Profile pictures, document uploads, media sharing, file attachments.

See Also: Phoenix Guide, LiveView Guide


Recipe 44: Email Sending with Swoosh

Problem: Send transactional emails from Phoenix application.

Solution:

defp deps do
  [
    {:swoosh, "~> 1.11"},
    {:gen_smtp, "~> 1.2"},  # For SMTP adapter
    {:finch, "~> 0.13"}     # For HTTP adapters
  ]
end

config :my_app, MyApp.Mailer,
  adapter: Swoosh.Adapters.SMTP,
  relay: "smtp.gmail.com",
  port: 587,
  username: System.get_env("SMTP_USERNAME"),
  password: System.get_env("SMTP_PASSWORD"),
  tls: :always,
  auth: :always

config :swoosh, :api_client, Swoosh.ApiClient.Finch

if Mix.env() == :dev do
  config :my_app, MyApp.Mailer, adapter: Swoosh.Adapters.Local
end

defmodule MyApp.Mailer do
  use Swoosh.Mailer, otp_app: :my_app
end

defmodule MyApp.Emails.UserEmail do
  import Swoosh.Email

  def welcome_email(user) do
    new()
    |> to({user.name, user.email})
    |> from({"My App", "noreply@myapp.com"})
    |> subject("Welcome to My App!")
    |> html_body("<h1>Hello #{user.name}!</h1><p>Welcome to My App.</p>")
    |> text_body("Hello #{user.name}!\n\nWelcome to My App.")
  end

  def password_reset_email(user, reset_token) do
    reset_url = "https://myapp.com/reset-password?token=#{reset_token}"

    new()
    |> to(user.email)
    |> from({"My App", "noreply@myapp.com"})
    |> subject("Reset your password")
    |> html_body("""
      <h1>Password Reset</h1>
      <p>Click the link below to reset your password:</p>
      <p><a href="#{reset_url}">Reset Password</a></p>
      <p>This link expires in 1 hour.</p>
    """)
    |> text_body("""
      Password Reset

      Visit this link to reset your password:
      #{reset_url}

      This link expires in 1 hour.
    """)
  end

  def notification_email(user, notification) do
    new()
    |> to(user.email)
    |> from({"My App Notifications", "notifications@myapp.com"})
    |> subject(notification.title)
    |> html_body(notification.body_html)
    |> text_body(notification.body_text)
    |> put_provider_option(:track_opens, true)
    |> put_provider_option(:track_links, true)
  end

  def attachment_email(user, file_path) do
    new()
    |> to(user.email)
    |> from({"My App", "noreply@myapp.com"})
    |> subject("Your report is ready")
    |> html_body("<p>Please find your report attached.</p>")
    |> attachment(Swoosh.Attachment.new(file_path))
  end
end

defmodule MyApp.Accounts do
  alias MyApp.Mailer
  alias MyApp.Emails.UserEmail

  def register_user(attrs) do
    with {:ok, user} <- create_user(attrs) do
      # Send welcome email asynchronously
      user
      |> UserEmail.welcome_email()
      |> Mailer.deliver()

      {:ok, user}
    end
  end

  def request_password_reset(email) do
    case get_user_by_email(email) do
      nil ->
        # Don't reveal whether email exists
        :ok

      user ->
        reset_token = generate_reset_token(user)

        user
        |> UserEmail.password_reset_email(reset_token)
        |> Mailer.deliver()

        :ok
    end
  end

  defp create_user(_attrs), do: {:ok, %{}}
  defp get_user_by_email(_email), do: nil
  defp generate_reset_token(_user), do: "token123"
end

defmodule MyApp.Workers.EmailWorker do
  use Oban.Worker, queue: :emails, max_attempts: 3

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"email_type" => "welcome", "user_id" => user_id}}) do
    user = MyApp.Accounts.get_user!(user_id)

    user
    |> MyApp.Emails.UserEmail.welcome_email()
    |> MyApp.Mailer.deliver()

    :ok
  end

  def perform(%Oban.Job{args: %{"email_type" => "password_reset", "user_id" => user_id, "token" => token}}) do
    user = MyApp.Accounts.get_user!(user_id)

    user
    |> MyApp.Emails.UserEmail.password_reset_email(token)
    |> MyApp.Mailer.deliver()

    :ok
  end
end

%{email_type: "welcome", user_id: user.id}
|> MyApp.Workers.EmailWorker.new()
|> Oban.insert()

defmodule MyApp.Emails.UserEmailTest do
  use ExUnit.Case, async: true
  import Swoosh.TestAssertions

  alias MyApp.Emails.UserEmail

  test "welcome email" do
    user = %{name: "Alice", email: "alice@example.com"}
    email = UserEmail.welcome_email(user)

    assert email.to == [{"Alice", "alice@example.com"}]
    assert email.from == {"My App", "noreply@myapp.com"}
    assert email.subject == "Welcome to My App!"
    assert email.html_body =~ "Hello Alice!"
  end

  test "sends welcome email" do
    user = %{name: "Bob", email: "bob@example.com"}

    user
    |> UserEmail.welcome_email()
    |> MyApp.Mailer.deliver()

    assert_email_sent subject: "Welcome to My App!"
  end
end

How It Works: Swoosh provides unified email interface. Adapters handle different providers (SMTP, SendGrid, Mailgun). deliver/1 sends emails. HTML and text bodies supported. Attachments via Swoosh.Attachment. Background processing recommended for reliability.

Use Cases: Welcome emails, password resets, notifications, newsletters, transactional receipts.

See Also: GenServer Guide, Intermediate Tutorial - Phoenix


Recipe 45: Scheduled Tasks with Quantum

Problem: Run periodic tasks (cron jobs) in Elixir application.

Solution:

defp deps do
  [
    {:quantum, "~> 3.5"},
    {:timex, "~> 3.7"}
  ]
end

config :my_app, MyApp.Scheduler,
  jobs: [
    # Every minute
    {"* * * * *", {MyApp.Tasks, :cleanup_old_sessions, []}},
    # Every day at 2 AM
    {"0 2 * * *", {MyApp.Tasks, :daily_report, []}},
    # Every Monday at 9 AM
    {"0 9 * * 1", {MyApp.Tasks, :weekly_summary, []}},
    # Every 15 minutes
    {"*/15 * * * *", {MyApp.Tasks, :check_health, []}},
  ]

defmodule MyApp.Scheduler do
  use Quantum, otp_app: :my_app
end

defmodule MyApp.Tasks do
  require Logger

  def cleanup_old_sessions do
    Logger.info("Starting session cleanup")

    # Delete sessions older than 30 days
    cutoff_date = DateTime.utc_now() |> DateTime.add(-30, :day)

    MyApp.Repo.delete_all(
      from s in MyApp.Session,
      where: s.inserted_at < ^cutoff_date
    )

    Logger.info("Session cleanup completed")
  end

  def daily_report do
    Logger.info("Generating daily report")

    # Aggregate data from yesterday
    yesterday = DateTime.utc_now() |> DateTime.add(-1, :day)

    stats = %{
      new_users: count_new_users(yesterday),
      active_users: count_active_users(yesterday),
      revenue: calculate_revenue(yesterday)
    }

    # Send report email
    MyApp.Emails.ReportEmail.daily_report(stats)
    |> MyApp.Mailer.deliver()

    Logger.info("Daily report sent")
  end

  def weekly_summary do
    Logger.info("Generating weekly summary")

    # Aggregate data from last 7 days
    week_ago = DateTime.utc_now() |> DateTime.add(-7, :day)

    summary = generate_weekly_summary(week_ago)

    # Send to all admins
    MyApp.Accounts.list_admins()
    |> Enum.each(fn admin ->
      MyApp.Emails.ReportEmail.weekly_summary(admin, summary)
      |> MyApp.Mailer.deliver()
    end)

    Logger.info("Weekly summary sent")
  end

  def check_health do
    # Check application health
    checks = [
      check_database(),
      check_redis(),
      check_external_api()
    ]

    failed = Enum.filter(checks, &match?({:error, _}, &1))

    if length(failed) > 0 do
      Logger.error("Health check failed: #{inspect(failed)}")
      alert_ops_team(failed)
    end
  end

  defp count_new_users(_date), do: 0
  defp count_active_users(_date), do: 0
  defp calculate_revenue(_date), do: 0
  defp generate_weekly_summary(_date), do: %{}
  defp check_database, do: :ok
  defp check_redis, do: :ok
  defp check_external_api, do: :ok
  defp alert_ops_team(_failures), do: :ok
end

defmodule MyApp.JobManager do
  alias MyApp.Scheduler

  def add_job(name, schedule, task) do
    Scheduler.new_job()
    |> Quantum.Job.set_name(name)
    |> Quantum.Job.set_schedule(Crontab.CronExpression.Parser.parse!(schedule))
    |> Quantum.Job.set_task(task)
    |> Scheduler.add_job()
  end

  def remove_job(name) do
    Scheduler.delete_job(name)
  end

  def list_jobs do
    Scheduler.jobs()
  end

  def pause_job(name) do
    Scheduler.deactivate_job(name)
  end

  def resume_job(name) do
    Scheduler.activate_job(name)
  end
end

MyApp.JobManager.add_job(
  :custom_task,
  "0 3 * * *",  # Every day at 3 AM
  fn -> IO.puts("Custom task running") end
)

MyApp.JobManager.list_jobs()

MyApp.JobManager.pause_job(:cleanup_old_sessions)

MyApp.JobManager.resume_job(:cleanup_old_sessions)

MyApp.JobManager.remove_job(:custom_task)

How It Works: Quantum schedules tasks using cron syntax. Jobs run in separate processes. Quantum.Job defines task configuration. Dynamic job management allows runtime scheduling. Tasks should be idempotent (safe to retry).

Use Cases: Data cleanup, report generation, health checks, cache warming, backup tasks.

See Also: GenServer Guide, Intermediate Tutorial - OTP


Recipe 46: GraphQL API with Absinthe

Problem: Build GraphQL API for flexible client queries.

Solution:

defp deps do
  [
    {:absinthe, "~> 1.7"},
    {:absinthe_plug, "~> 1.5"},
    {:absinthe_phoenix, "~> 2.0"}
  ]
end

defmodule MyAppWeb.Schema do
  use Absinthe.Schema
  import Absinthe.Resolution.Helpers

  # Define types
  object :user do
    field :id, non_null(:id)
    field :email, non_null(:string)
    field :name, :string
    field :posts, list_of(:post) do
      resolve(dataloader(MyApp.Content))
    end
  end

  object :post do
    field :id, non_null(:id)
    field :title, non_null(:string)
    field :body, :string
    field :published, :boolean
    field :author, :user do
      resolve(dataloader(MyApp.Accounts))
    end
    field :inserted_at, :string
  end

  # Input types for mutations
  input_object :user_input do
    field :email, non_null(:string)
    field :name, non_null(:string)
    field :password, non_null(:string)
  end

  input_object :post_input do
    field :title, non_null(:string)
    field :body, non_null(:string)
    field :published, :boolean, default_value: false
  end

  # Queries
  query do
    field :user, :user do
      arg :id, non_null(:id)
      resolve(&MyAppWeb.Resolvers.Accounts.get_user/3)
    end

    field :users, list_of(:user) do
      arg :limit, :integer, default_value: 10
      arg :offset, :integer, default_value: 0
      resolve(&MyAppWeb.Resolvers.Accounts.list_users/3)
    end

    field :post, :post do
      arg :id, non_null(:id)
      resolve(&MyAppWeb.Resolvers.Content.get_post/3)
    end

    field :posts, list_of(:post) do
      arg :published, :boolean
      arg :limit, :integer
      resolve(&MyAppWeb.Resolvers.Content.list_posts/3)
    end
  end

  # Mutations
  mutation do
    field :create_user, :user do
      arg :input, non_null(:user_input)
      resolve(&MyAppWeb.Resolvers.Accounts.create_user/3)
    end

    field :create_post, :post do
      arg :input, non_null(:post_input)
      middleware(MyAppWeb.Middleware.Authenticate)
      resolve(&MyAppWeb.Resolvers.Content.create_post/3)
    end

    field :update_post, :post do
      arg :id, non_null(:id)
      arg :input, non_null(:post_input)
      middleware(MyAppWeb.Middleware.Authenticate)
      resolve(&MyAppWeb.Resolvers.Content.update_post/3)
    end

    field :delete_post, :boolean do
      arg :id, non_null(:id)
      middleware(MyAppWeb.Middleware.Authenticate)
      resolve(&MyAppWeb.Resolvers.Content.delete_post/3)
    end
  end

  # Subscriptions (real-time updates)
  subscription do
    field :post_created, :post do
      config(fn _args, _info ->
        {:ok, topic: "posts"}
      end)

      trigger(:create_post, topic: fn _post ->
        "posts"
      end)
    end
  end

  # Dataloader for efficient N+1 prevention
  def context(ctx) do
    loader =
      Dataloader.new()
      |> Dataloader.add_source(MyApp.Accounts, MyApp.Accounts.data())
      |> Dataloader.add_source(MyApp.Content, MyApp.Content.data())

    Map.put(ctx, :loader, loader)
  end

  def plugins do
    [Absinthe.Middleware.Dataloader] ++ Absinthe.Plugin.defaults()
  end
end

defmodule MyAppWeb.Resolvers.Accounts do
  alias MyApp.Accounts

  def get_user(_parent, %{id: id}, _resolution) do
    case Accounts.get_user(id) do
      nil -> {:error, "User not found"}
      user -> {:ok, user}
    end
  end

  def list_users(_parent, args, _resolution) do
    limit = Map.get(args, :limit, 10)
    offset = Map.get(args, :offset, 0)

    users = Accounts.list_users(limit, offset)
    {:ok, users}
  end

  def create_user(_parent, %{input: input}, _resolution) do
    case Accounts.create_user(input) do
      {:ok, user} -> {:ok, user}
      {:error, changeset} -> {:error, format_changeset_errors(changeset)}
    end
  end

  defp format_changeset_errors(changeset) do
    Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
      Enum.reduce(opts, msg, fn {key, value}, acc ->
        String.replace(acc, "%{#{key}}", to_string(value))
      end)
    end)
  end
end

defmodule MyAppWeb.Middleware.Authenticate do
  @behaviour Absinthe.Middleware

  def call(resolution, _opts) do
    case resolution.context do
      %{current_user: _user} ->
        resolution

      _ ->
        resolution
        |> Absinthe.Resolution.put_result({:error, "Unauthenticated"})
    end
  end
end

scope "/api" do
  pipe_through :api

  forward "/graphql", Absinthe.Plug,
    schema: MyAppWeb.Schema

  if Mix.env() == :dev do
    forward "/graphiql", Absinthe.Plug.GraphiQL,
      schema: MyAppWeb.Schema,
      interface: :playground
  end
end

How It Works: Absinthe provides GraphQL implementation. Schema defines types, queries, mutations, subscriptions. Resolvers fetch data. Dataloader prevents N+1 queries. Middleware handles authentication. Subscriptions enable real-time updates via Phoenix Channels.

Use Cases: Flexible APIs, mobile apps, SPAs, real-time dashboards, data aggregation.

See Also: Phoenix REST API Guide, Intermediate Tutorial - Phoenix


Recipe 47: Dependency Injection for Testing

Problem: Make code testable by injecting dependencies.

Solution:

defmodule MyApp.EmailService do
  @callback send_email(recipient :: String.t(), subject :: String.t(), body :: String.t()) :: :ok | {:error, term()}
end

defmodule MyApp.EmailService.Swoosh do
  @behaviour MyApp.EmailService

  def send_email(recipient, subject, body) do
    import Swoosh.Email

    new()
    |> to(recipient)
    |> from("noreply@myapp.com")
    |> subject(subject)
    |> html_body(body)
    |> MyApp.Mailer.deliver()

    :ok
  end
end

defmodule MyApp.EmailService.Mock do
  @behaviour MyApp.EmailService

  def send_email(recipient, subject, body) do
    # Store in process dictionary for assertion in tests
    emails = Process.get(:sent_emails, [])
    Process.put(:sent_emails, emails ++ [{recipient, subject, body}])
    :ok
  end

  def get_sent_emails do
    Process.get(:sent_emails, [])
  end

  def clear_sent_emails do
    Process.put(:sent_emails, [])
  end
end

config :my_app, :email_service, MyApp.EmailService.Swoosh

config :my_app, :email_service, MyApp.EmailService.Mock

defmodule MyApp.Accounts do
  def email_service do
    Application.get_env(:my_app, :email_service)
  end

  def register_user(attrs) do
    with {:ok, user} <- create_user(attrs) do
      # Use configured implementation
      email_service().send_email(
        user.email,
        "Welcome!",
        "Welcome to My App"
      )

      {:ok, user}
    end
  end

  defp create_user(_attrs), do: {:ok, %{email: "user@example.com"}}
end

defmodule MyApp.AccountsTest do
  use ExUnit.Case, async: true
  alias MyApp.EmailService.Mock

  setup do
    Mock.clear_sent_emails()
    :ok
  end

  test "register_user sends welcome email" do
    {:ok, user} = MyApp.Accounts.register_user(%{email: "test@example.com"})

    emails = Mock.get_sent_emails()
    assert length(emails) == 1
    assert {"test@example.com", "Welcome!", _body} = hd(emails)
  end
end


Mox.defmock(MyApp.EmailServiceMock, for: MyApp.EmailService)

config :my_app, :email_service, MyApp.EmailServiceMock

defmodule MyApp.AccountsTest do
  use ExUnit.Case, async: true
  import Mox

  # Set up Mox expectations
  setup :verify_on_exit!

  test "register_user sends welcome email" do
    MyApp.EmailServiceMock
    |> expect(:send_email, fn recipient, subject, _body ->
      assert recipient == "test@example.com"
      assert subject == "Welcome!"
      :ok
    end)

    {:ok, _user} = MyApp.Accounts.register_user(%{email: "test@example.com"})
  end

  test "register_user handles email failure" do
    MyApp.EmailServiceMock
    |> expect(:send_email, fn _recipient, _subject, _body ->
      {:error, :smtp_error}
    end)

    # Should still create user even if email fails
    {:ok, _user} = MyApp.Accounts.register_user(%{email: "test@example.com"})
  end
end

defmodule MyApp.UserCache do
  use GenServer

  # Accept dependencies in start_link
  def start_link(opts) do
    repo = Keyword.get(opts, :repo, MyApp.Repo)
    cache = Keyword.get(opts, :cache, :ets)

    GenServer.start_link(__MODULE__, %{repo: repo, cache: cache}, name: __MODULE__)
  end

  @impl true
  def init(deps) do
    {:ok, deps}
  end

  @impl true
  def handle_call({:get_user, id}, _from, %{repo: repo, cache: cache} = state) do
    case cache_lookup(cache, id) do
      {:ok, user} ->
        {:reply, {:ok, user}, state}

      :miss ->
        case repo.get(MyApp.User, id) do
          nil ->
            {:reply, {:error, :not_found}, state}

          user ->
            cache_put(cache, id, user)
            {:reply, {:ok, user}, state}
        end
    end
  end

  defp cache_lookup(:ets, id), do: :miss  # Simplified
  defp cache_put(:ets, _id, _user), do: :ok
end

test "UserCache fetches from repo on cache miss" do
  # Create mock implementations
  mock_repo = fn ->
    %{
      get: fn _module, id ->
        if id == 1, do: %MyApp.User{id: 1, name: "Alice"}, else: nil
      end
    }
  end

  mock_cache = :mock_ets

  # Start GenServer with mocks
  {:ok, _pid} = MyApp.UserCache.start_link(repo: mock_repo.(), cache: mock_cache)

  # Test
  assert {:ok, user} = GenServer.call(MyApp.UserCache, {:get_user, 1})
  assert user.name == "Alice"
end

How It Works: Behaviors define contracts. Configuration selects implementation. Mox provides compile-time mock generation. Process dictionary stores test state. GenServer accepts dependencies in start_link. Tests inject mock implementations.

Use Cases: Unit testing, integration testing, external service mocking, test isolation.

See Also: Testing Guide, GenServer Guide, Beginner Tutorial - Testing


Recipe 48: Distributed Systems with libcluster

Problem: Build distributed Elixir cluster for high availability.

Solution:

defp deps do
  [
    {:libcluster, "~> 3.3"}
  ]
end

config :libcluster,
  topologies: [
    local: [
      strategy: Cluster.Strategy.Gossip,
      config: [
        port: 45892,
        if_addr: "0.0.0.0",
        multicast_addr: "230.1.1.251",
        multicast_ttl: 1,
        secret: "my_secret"
      ]
    ]
  ]

config :libcluster,
  topologies: [
    k8s: [
      strategy: Elixir.Cluster.Strategy.Kubernetes,
      config: [
        mode: :dns,
        kubernetes_node_basename: "myapp",
        kubernetes_selector: "app=myapp",
        kubernetes_namespace: "production",
        polling_interval: 10_000
      ]
    ]
  ]

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    topologies = Application.get_env(:libcluster, :topologies, [])

    children = [
      # Start libcluster supervisor
      {Cluster.Supervisor, [topologies, [name: MyApp.ClusterSupervisor]]},

      # Start your application
      MyApp.Repo,
      MyAppWeb.Endpoint,

      # Distributed registry
      {Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()},

      # Distributed supervisor
      {Horde.Registry, [name: MyApp.HordeRegistry, keys: :unique]},
      {Horde.DynamicSupervisor, [name: MyApp.HordeSupervisor, strategy: :one_for_one]}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule MyApp.DistributedCache do
  use GenServer

  def start_link(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    GenServer.start_link(__MODULE__, %{}, name: {:global, name})
  end

  def get(key) do
    case :global.whereis_name(__MODULE__) do
      :undefined -> {:error, :not_found}
      pid -> GenServer.call(pid, {:get, key})
    end
  end

  def put(key, value) do
    case :global.whereis_name(__MODULE__) do
      :undefined -> {:error, :not_found}
      pid -> GenServer.call(pid, {:put, key, value})
    end
  end

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_call({:get, key}, _from, state) do
    {:reply, Map.get(state, key), state}
  end

  @impl true
  def handle_call({:put, key, value}, _from, state) do
    {:reply, :ok, Map.put(state, key, value)}
  end
end

defmodule MyApp.DistributedTasks do
  # Execute function on all nodes
  def run_on_all_nodes(func) do
    nodes = [Node.self() | Node.list()]

    tasks = Enum.map(nodes, fn node ->
      Task.Supervisor.async({MyApp.TaskSupervisor, node}, func)
    end)

    Task.await_many(tasks, 5000)
  end

  # Execute function on specific node
  def run_on_node(node, func) do
    Task.Supervisor.async({MyApp.TaskSupervisor, node}, func)
    |> Task.await(5000)
  end

  # Execute function on least loaded node
  def run_on_least_loaded_node(func) do
    node = find_least_loaded_node()
    run_on_node(node, func)
  end

  defp find_least_loaded_node do
    nodes = [Node.self() | Node.list()]

    nodes
    |> Enum.map(fn node ->
      load = :rpc.call(node, :cpu_sup, :avg1, []) / 256
      {node, load}
    end)
    |> Enum.min_by(fn {_node, load} -> load end)
    |> elem(0)
  end
end

defmodule MyApp.Events do
  def broadcast(topic, message) do
    Phoenix.PubSub.broadcast(MyApp.PubSub, topic, message)
  end

  def subscribe(topic) do
    Phoenix.PubSub.subscribe(MyApp.PubSub, topic)
  end

  # Broadcast to all nodes
  def broadcast_to_cluster(event, data) do
    nodes = [Node.self() | Node.list()]

    Enum.each(nodes, fn node ->
      :rpc.cast(node, __MODULE__, :local_broadcast, [event, data])
    end)
  end

  def local_broadcast(event, data) do
    broadcast("cluster:events", {event, data})
  end
end

defmodule MyApp.HordeWorker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  def start_distributed(id, args) do
    # Horde ensures only one instance runs across cluster
    child_spec = %{
      id: id,
      start: {__MODULE__, :start_link, [args]},
      restart: :transient
    }

    Horde.DynamicSupervisor.start_child(MyApp.HordeSupervisor, child_spec)
  end

  @impl true
  def init(args) do
    # Register with Horde Registry
    {:ok, args}
  end
end

defmodule MyApp.ClusterUtils do
  def list_nodes do
    Node.list()
  end

  def connect_node(node_name) do
    Node.connect(node_name)
  end

  def disconnect_node(node_name) do
    Node.disconnect(node_name)
  end

  def cluster_size do
    length([Node.self() | Node.list()])
  end

  def is_clustered? do
    cluster_size() > 1
  end

  # Get cluster-wide statistics
  def cluster_stats do
    nodes = [Node.self() | Node.list()]

    Enum.map(nodes, fn node ->
      stats = :rpc.call(node, :erlang, :statistics, [:wall_clock])
      memory = :rpc.call(node, :erlang, :memory, [])

      %{
        node: node,
        uptime_ms: elem(stats, 0),
        memory: memory,
        process_count: :rpc.call(node, :erlang, :system_info, [:process_count])
      }
    end)
  end
end


MyApp.ClusterUtils.list_nodes()

MyApp.ClusterUtils.cluster_size()

MyApp.DistributedTasks.run_on_all_nodes(fn ->
  IO.puts("Running on #{Node.self()}")
end)

MyApp.Events.broadcast_to_cluster(:deployment, %{version: "1.0.0"})

MyApp.ClusterUtils.cluster_stats()

How It Works: libcluster automatically discovers and connects nodes using various strategies (Gossip, Kubernetes, DNS, static). Horde provides distributed process registry and supervision. Global registry ensures single process instance across cluster. Phoenix.PubSub broadcasts messages to all nodes. RPC executes functions on remote nodes.

Use Cases: High availability systems, horizontal scaling, distributed caching, multi-region deployment, load balancing.

See Also: GenServer Guide, Advanced Tutorial - Distributed Systems, Intermediate Tutorial - OTP


Summary

This cookbook provides 48 comprehensive tested recipes across 17 categories:

  1. Data Structures (5 recipes): Map transformations, deep merging, grouping, keyword lists, nested updates
  2. Pattern Matching (3 recipes): Complex extraction, case matching with guards, pin operator patterns
  3. Functions and Pipes (3 recipes): Function composition, higher-order functions, capture syntax
  4. String Manipulation (2 recipes): Parsing with regex, validation and sanitization
  5. Concurrency (4 recipes): Parallel processing with Task, process registry patterns, rate limiting (token bucket), debounce/throttle
  6. OTP Patterns (5 recipes): State machines with GenServer, dynamic supervision trees, periodic tasks, background job processing, scheduled tasks with Quantum
  7. Error Handling (2 recipes): Result tuple patterns, supervision for fault tolerance
  8. File I/O (2 recipes): Safe file operations, CSV processing and parsing
  9. Phoenix Web Development (6 recipes): REST API endpoints, authentication with Plugs, WebSocket communication (Channels), session-based auth, API versioning strategies, file uploads
  10. LiveView Real-Time (2 recipes): Real-time forms with validation, PubSub live updates
  11. Ecto Database (3 recipes): Complex queries with joins, transactions and rollbacks, database migrations (schema and data)
  12. Testing (3 recipes): GenServer behavior testing, mocking external services, dependency injection patterns
  13. Performance (2 recipes): Profiling and benchmarking with Benchee, database query optimization
  14. Configuration (1 recipe): Environment-based configuration management
  15. Debugging (1 recipe): Debug techniques and tools (IO.inspect, IEx.pry, Logger, Observer, Recon)
  16. Advanced Phoenix (3 recipes): Internationalization (i18n) with Gettext, email sending with Swoosh, GraphQL API with Absinthe
  17. Distributed Systems (1 recipe): Clustering with libcluster, distributed processes with Horde

All recipes include:

  • Clear problem statement
  • Complete working solution
  • Explanation of how it works
  • Real-world use cases
  • Cross-references to related content

Next Steps:

Last updated