Ecto
Need robust database layer in Elixir? Ecto provides schemas, changesets, composable queries, associations, migrations, and transactions for reliable data persistence.
Prerequisites
- Database concepts (SQL)
- Phoenix or standalone Elixir app
- PostgreSQL installed
- Completed Intermediate Tutorial
Problem
Building data-driven applications requires schema definition, data validation, complex queries with joins and aggregations, relationship management, database migrations, and transaction handling. You need type-safe database operations, efficient query composition, and proper data validation.
Challenges:
- Defining schemas and relationships between tables
- Validating and casting user input safely
- Writing complex queries with joins and aggregations
- Managing database migrations and schema changes
- Handling transactions and concurrent updates
- Optimizing N+1 queries and performance
Solution
Use Ecto for type-safe database operations with schemas for structure, changesets for validation, composable queries, associations for relationships, and migrations for schema versioning.
How It Works
1. Basic Schema
defmodule MyApp.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
schema "users" do
field :name, :string
field :email, :string
field :age, :integer
field :bio, :string
field :active, :boolean, default: true
timestamps()
end
@doc """
Changeset for creating or updating users.
"""
def changeset(user, attrs) do
user
|> cast(attrs, [:name, :email, :age, :bio, :active])
|> validate_required([:name, :email])
|> validate_format(:email, ~r/@/)
|> validate_number(:age, greater_than: 0, less_than: 150)
|> validate_length(:bio, max: 500)
|> unique_constraint(:email)
end
endMigration:
defmodule MyApp.Repo.Migrations.CreateUsers do
use Ecto.Migration
def change do
create table(:users) do
add :name, :string, null: false
add :email, :string, null: false
add :age, :integer
add :bio, :text
add :active, :boolean, default: true, null: false
timestamps()
end
create unique_index(:users, [:email])
end
end2. CRUD Operations
defmodule MyApp.Accounts do
import Ecto.Query
alias MyApp.Repo
alias MyApp.Accounts.User
@doc """
Returns the list of users.
"""
def list_users do
Repo.all(User)
end
@doc """
Gets a single user.
Raises `Ecto.NoResultsError` if the User does not exist.
"""
def get_user!(id), do: Repo.get!(User, id)
@doc """
Gets a single user, returns nil if not found.
"""
def get_user(id), do: Repo.get(User, id)
@doc """
Gets a user by email.
"""
def get_user_by_email(email) do
Repo.get_by(User, email: email)
end
@doc """
Creates a user.
"""
def create_user(attrs \\ %{}) do
%User{}
|> User.changeset(attrs)
|> Repo.insert()
end
@doc """
Updates a user.
"""
def update_user(%User{} = user, attrs) do
user
|> User.changeset(attrs)
|> Repo.update()
end
@doc """
Deletes a user.
"""
def delete_user(%User{} = user) do
Repo.delete(user)
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking user changes.
"""
def change_user(%User{} = user, attrs \\ %{}) do
User.changeset(user, attrs)
end
end3. Advanced Queries
Basic queries:
import Ecto.Query
Repo.all(User)
Repo.all(from u in User, where: u.age > 18)
Repo.all(from u in User, order_by: [asc: u.name])
Repo.all(from u in User, limit: 10)
Repo.all(from u in User, select: {u.id, u.name})
Repo.aggregate(User, :count)Composed queries:
def list_active_users_by_age(min_age) do
User
|> where([u], u.active == true)
|> where([u], u.age >= ^min_age)
|> order_by([u], desc: u.inserted_at)
|> Repo.all()
end
def search_users(criteria) do
query = from u in User
query = if criteria[:name] do
from u in query, where: ilike(u.name, ^"%#{criteria[:name]}%")
else
query
end
query = if criteria[:min_age] do
from u in query, where: u.age >= ^criteria[:min_age]
else
query
end
query = if criteria[:active] do
from u in query, where: u.active == ^criteria[:active]
else
query
end
Repo.all(query)
endAggregations and grouping:
Repo.one(from u in User, select: avg(u.age))
Repo.all(
from u in User,
group_by: u.active,
select: {u.active, count(u.id)}
)
Repo.all(
from u in User,
group_by: fragment("CASE WHEN age < 30 THEN '< 30' ELSE '>= 30' END"),
select: {fragment("CASE WHEN age < 30 THEN '< 30' ELSE '>= 30' END"), count(u.id)}
)4. Associations
One-to-many:
defmodule MyApp.Accounts.User do
use Ecto.Schema
schema "users" do
field :name, :string
has_many :posts, MyApp.Blog.Post
timestamps()
end
end
defmodule MyApp.Blog.Post do
use Ecto.Schema
schema "posts" do
field :title, :string
field :body, :text
belongs_to :user, MyApp.Accounts.User
timestamps()
end
end
defmodule MyApp.Repo.Migrations.CreatePosts do
use Ecto.Migration
def change do
create table(:posts) do
add :title, :string, null: false
add :body, :text
add :user_id, references(:users, on_delete: :delete_all), null: false
timestamps()
end
create index(:posts, [:user_id])
end
end
user = Repo.get!(User, 1) |> Repo.preload(:posts)
user.posts # List of posts
user = Repo.get!(User, 1)
|> Repo.preload(posts: from(p in Post, order_by: [desc: p.inserted_at]))
user = Repo.get!(User, 1) |> Repo.preload([:posts, :comments])Many-to-many:
defmodule MyApp.Blog.Post do
use Ecto.Schema
schema "posts" do
field :title, :string
many_to_many :tags, MyApp.Blog.Tag, join_through: "posts_tags"
timestamps()
end
end
defmodule MyApp.Blog.Tag do
use Ecto.Schema
schema "tags" do
field :name, :string
many_to_many :posts, MyApp.Blog.Post, join_through: "posts_tags"
timestamps()
end
end
defmodule MyApp.Repo.Migrations.CreatePostsTags do
use Ecto.Migration
def change do
create table(:tags) do
add :name, :string, null: false
timestamps()
end
create unique_index(:tags, [:name])
create table(:posts_tags) do
add :post_id, references(:posts, on_delete: :delete_all), null: false
add :tag_id, references(:tags, on_delete: :delete_all), null: false
end
create index(:posts_tags, [:post_id])
create index(:posts_tags, [:tag_id])
create unique_index(:posts_tags, [:post_id, :tag_id])
end
end
post = Repo.get!(Post, 1) |> Repo.preload(:tags)
tag_ids = [1, 2, 3]
tags = Repo.all(from t in Tag, where: t.id in ^tag_ids)
changeset = post
|> Repo.preload(:tags)
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_assoc(:tags, tags)
Repo.update!(changeset)5. Advanced Changesets
Multiple changesets:
defmodule User do
# Registration changeset
def registration_changeset(user, attrs) do
user
|> cast(attrs, [:name, :email, :password])
|> validate_required([:name, :email, :password])
|> validate_format(:email, ~r/@/)
|> validate_length(:password, min: 8)
|> unique_constraint(:email)
|> hash_password()
end
# Update changeset
def update_changeset(user, attrs) do
user
|> cast(attrs, [:name, :bio])
|> validate_required([:name])
|> validate_length(:bio, max: 500)
end
# Password changeset
def password_changeset(user, attrs) do
user
|> cast(attrs, [:password])
|> validate_required([:password])
|> validate_length(:password, min: 8)
|> hash_password()
end
defp hash_password(changeset) do
case changeset do
%Ecto.Changeset{valid?: true, changes: %{password: password}} ->
put_change(changeset, :password_hash, Bcrypt.hash_pwd_salt(password))
_ ->
changeset
end
end
endCustom validations:
defmodule User do
def changeset(user, attrs) do
user
|> cast(attrs, [:name, :email, :age, :website])
|> validate_required([:name, :email])
|> validate_url(:website)
|> validate_adult(:age)
end
defp validate_url(changeset, field) do
validate_change changeset, field, fn _, value ->
case URI.parse(value) do
%URI{scheme: scheme, host: host} when scheme in ["http", "https"] and not is_nil(host) ->
[]
_ ->
[{field, "must be a valid URL"}]
end
end
end
defp validate_adult(changeset, field) do
validate_change changeset, field, fn _, age ->
if age >= 18 do
[]
else
[{field, "must be 18 or older"}]
end
end
end
end6. Transactions
Basic transaction:
def transfer_money(from_account, to_account, amount) do
Repo.transaction(fn ->
with {:ok, _} <- withdraw(from_account, amount),
{:ok, _} <- deposit(to_account, amount) do
:ok
else
{:error, reason} -> Repo.rollback(reason)
end
end)
endMulti transactions:
alias Ecto.Multi
def create_user_with_profile(user_attrs, profile_attrs) do
Multi.new()
|> Multi.insert(:user, User.changeset(%User{}, user_attrs))
|> Multi.insert(:profile, fn %{user: user} ->
profile_attrs = Map.put(profile_attrs, :user_id, user.id)
Profile.changeset(%Profile{}, profile_attrs)
end)
|> Repo.transaction()
endComplex multi:
def process_order(order_attrs, line_items) do
Multi.new()
|> Multi.insert(:order, Order.changeset(%Order{}, order_attrs))
|> Multi.run(:validate_inventory, fn _repo, %{order: order} ->
if sufficient_inventory?(line_items) do
{:ok, :validated}
else
{:error, :insufficient_inventory}
end
end)
|> Multi.insert_all(:line_items, OrderLineItem, fn %{order: order} ->
Enum.map(line_items, fn item ->
%{
order_id: order.id,
product_id: item.product_id,
quantity: item.quantity,
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}
end)
end)
|> Multi.run(:update_inventory, fn _repo, %{line_items: {_, items}} ->
update_inventory(items)
end)
|> Repo.transaction()
end7. Embedded Schemas
defmodule MyApp.Accounts.Address do
use Ecto.Schema
import Ecto.Changeset
embedded_schema do
field :street, :string
field :city, :string
field :state, :string
field :zip, :string
field :country, :string
end
def changeset(address, attrs) do
address
|> cast(attrs, [:street, :city, :state, :zip, :country])
|> validate_required([:street, :city, :country])
end
end
defmodule MyApp.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
schema "users" do
field :name, :string
embeds_one :address, Address
timestamps()
end
def changeset(user, attrs) do
user
|> cast(attrs, [:name])
|> cast_embed(:address, required: true)
end
end
attrs = %{
name: "Alice",
address: %{
street: "123 Main St",
city: "New York",
country: "USA"
}
}
{:ok, user} = %User{}
|> User.changeset(attrs)
|> Repo.insert()Variations
Optimistic Locking
defmodule Post do
schema "posts" do
field :title, :string
field :lock_version, :integer, default: 1
timestamps()
end
def changeset(post, attrs) do
post
|> cast(attrs, [:title])
|> validate_required([:title])
|> optimistic_lock(:lock_version)
end
end
case Repo.update(Post.changeset(post, %{title: "New Title"})) do
{:ok, updated_post} -> {:ok, updated_post}
{:error, changeset} ->
if changeset.errors[:lock_version] do
{:error, :stale_object}
else
{:error, changeset}
end
endSoft Deletes
defmodule User do
schema "users" do
field :name, :string
field :deleted_at, :utc_datetime
timestamps()
end
end
defmodule MyApp.Accounts do
def soft_delete(user) do
user
|> Ecto.Changeset.change(deleted_at: DateTime.utc_now())
|> Repo.update()
end
def list_active_users do
from(u in User, where: is_nil(u.deleted_at))
|> Repo.all()
end
endPolymorphic Associations
defmodule Comment do
schema "comments" do
field :body, :text
field :commentable_id, :integer
field :commentable_type, :string
timestamps()
end
end
create table(:comments) do
add :body, :text, null: false
add :commentable_id, :integer, null: false
add :commentable_type, :string, null: false
timestamps()
end
create index(:comments, [:commentable_id, :commentable_type])
def create_comment(commentable, attrs) do
attrs = attrs
|> Map.put(:commentable_id, commentable.id)
|> Map.put(:commentable_type, commentable.__struct__ |> Module.split() |> List.last())
%Comment{}
|> Comment.changeset(attrs)
|> Repo.insert()
endAdvanced Patterns
1. N+1 Query Prevention
Problem:
users = Repo.all(User)
Enum.each(users, fn user ->
posts = Repo.all(from p in Post, where: p.user_id == ^user.id)
IO.inspect(posts)
end)Solution:
users = User
|> Repo.all()
|> Repo.preload(:posts)
Enum.each(users, fn user ->
IO.inspect(user.posts)
end)2. Dynamic Queries
def search(filters) do
base_query = from u in User
Enum.reduce(filters, base_query, fn
{:name, name}, query ->
from u in query, where: ilike(u.name, ^"%#{name}%")
{:min_age, age}, query ->
from u in query, where: u.age >= ^age
{:status, status}, query ->
from u in query, where: u.status == ^status
_, query ->
query
end)
|> Repo.all()
end
search(name: "John", min_age: 18, status: :active)3. Batch Operations
users = [
%{name: "Alice", email: "alice@example.com"},
%{name: "Bob", email: "bob@example.com"}
]
{count, _} = Repo.insert_all(User, users)
from(u in User, where: u.age < 18)
|> Repo.update_all(set: [status: :minor])
from(u in User, where: is_nil(u.email))
|> Repo.delete_all()Use Cases
Data Persistence:
- User accounts and profiles
- Blog posts and comments
- E-commerce products and orders
- Analytics and metrics
Complex Queries:
- Reporting and dashboards
- Search functionality
- Data aggregation
- Historical data analysis
Data Integrity:
- Transactional operations
- Concurrent updates
- Referential integrity
- Data validation
Best Practices
Use changesets for all data changes: Never insert/update raw data without validation
Preload associations: Avoid N+1 queries by using
Repo.preload/2Use transactions for related operations: Ensure data consistency with
Repo.transaction/2orMultiIndex foreign keys: Always add indexes to foreign key columns
Validate at database level: Use constraints in migrations for critical validations
Use specific changesets: Different operations (create, update, delete) need different validations
Common Pitfalls
- N+1 queries: Not preloading associations
- Missing indexes: Slow queries on large datasets
- Not using transactions: Inconsistent data states
- Overfetching: Loading entire records when only IDs needed
- Ignoring unique constraints: Race conditions on unique fields
- No pagination: Loading all records at once
Troubleshooting
Association Not Loaded
user.posts
user = Repo.preload(user, :posts)
user.postsUnique Constraint Not Working
create unique_index(:users, [:email])
|> unique_constraint(:email)Slow Queries
config :logger, level: :debug
create index(:posts, [:user_id])
create index(:posts, [:inserted_at])