rrrene* About

Advanced Techniques for Architecting Flow in Elixir

In the last post we explored how we can use either |> or with to model how data flows through our program.

There is, however, a third concept to model flow in your applications: to hand down a “token” during the execution of your program. This token contains all the information necessary for your program to fulfil its use-case.

In Elixir, this token is usually a struct. Let’s look at two examples.

Plug.Conn

The most famous example for this in the Elixir ecosystem can be found in Plug.

A “plug” is basically a function that takes a Plug.Conn struct as a first argument and returns a (modified) Plug.Conn struct. Each web request is processed by a Plug pipeline, a series of plugs that get invoked one after another. The Plug.Conn struct contains all information received in the web request and all information to be sent in the server’s response.

defmodule MyPlugPipeline do
  use Plug.Builder

  # You can plug modules, which implement the Plug behaviour
  plug Plug.Logger

  # You can plug local functions, which implement the Plug behaviour
  plug :hello, my_param: 42

  def hello(conn, opts) do
    if opts[:my_param] == 42 do
      send_resp(conn, 200, "The answer to all questions!")
    else
      send_resp(conn, 200, "Options are optional!")
    end
  end

  # You can even plug functions from other modules,
  # as long as they are imported into the current module
  import SomeOtherModule, only: [my_other_plug: 2]

  plug :my_other_plug
end

In each Plug, we can modify the Plug.Conn struct, e.g. set the reponse’s content, add additional response HTTP headers or halt the connection, which causes all the remaining plugs in the pipeline to be skipped.

def hello(conn, opts) do
  case prepare_response() do
    {timing_in_ms, body} ->
      conn
      |> put_resp_content_type("text/plain")
      |> put_resp_header("Server-Timing", "total;dur=#{timing_in_ms}")
      |> send_resp(200, body)

    _ ->
      halt(conn)
  end
end

Another example for a “token”, which is handed down in a business process, are changesets in Ecto.

Ecto.Changeset

Ecto.Changesets are structs used to apply filters, validations and other constraints during the manipulation of structs.

import Ecto.Changeset

user = %User{}

user
|> cast(params, [:name, :email, :age])
|> validate_required([:name, :email])
|> validate_format(:email, ~r/@/)
|> validate_inclusion(:age, 18..100)
|> unique_constraint(:email)

Just like Plug.Conn before, the Ecto.Changeset struct in this example flows through a pipeline of transformations and provides a binding interface for all functions involved in its use-case, i.e. filtering, casting, validating and constraining the manipulation of data.

Contrary to Plug.Conn, the scope of a changeset is not necessarily tied to any kind of request life cycle.

Let’s build our own!

Let’s use these insights to adapt this concept to an application of our own. We will stick with our example of converting images with a Mix task:

First, we introduce a struct to help us with handling given command-line arguments (green tasks in the image above).

We will call this struct Options:

defmodule Converter.Options do
  defstruct argv: nil,
            glob: nil,
            target_dir: nil,
            format: nil
end

We will use this to convert the given command-line arguments into a structured form and validate them. Later, we will prepare the conversion process using Options.

defmodule Mix.Tasks.ConvertImages do
  use Mix.Task

  alias Converter.Options

  @default_glob "./image_uploads/*"
  @default_target_dir "./tmp"
  @default_format "jpg"

  def run(argv) do
    validation =
      %Options{argv: argv}
      |> parse_options()
      |> validate_options()

    case validation do
      {:ok, options} ->
        filenames = prepare_conversion(options)

        results = convert_images(filenames, options.target_dir, options.format)

        report_results(options.target_dir, results)

      {:error, error} ->
        report_error(error)
    end
  end

  # Each stage of the conversion process takes the `Options` as argument ...
  defp parse_options(%Options{argv: argv} = options) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || @default_glob
    target_dir = opts[:target_dir] || @default_target_dir
    format = opts[:format] || @default_format

    %Options{options | glob: glob, target_dir: target_dir, format: format}
  end

  # ... we pattern match on the fields that are relevant to the current step!
  defp validate_options(%Options{glob: glob, format: format} = options) do
    filenames = Path.wildcard(glob)

    cond do
      Enum.empty?(filenames) ->
        {:error, "No images found."}

      !Enum.member?(~w[jpg png], format) ->
        {:error, "Unrecognized format: #{format}"}

      true ->
        {:ok, options}
    end
  end

  defp prepare_conversion(%Options{glob: glob, target_dir: target_dir}) do
    File.mkdir_p!(target_dir)

    Path.wildcard(glob)
  end

  defp convert_images(filenames, target_dir, format) do
    results =
      Enum.map(filenames, fn filename ->
        Converter.convert_image(filename, target_dir, format)
      end)

    results
  end

  defp report_results(target_dir, results) do
    IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
  end

  defp report_error(error) do
    IO.puts("[error] #{error}")
  end
end

Our new Options struct takes a role similar to the one Ecto.Changeset plays: It helps us to fulfil a specific task (parsing and validating options for the conversion process).

To achieve this, our run/1 function had to be restructured:

validation =
  %Options{argv: argv}
  |> parse_options()
  |> validate_options()

case validation do
  {:ok, options} ->
    filenames = prepare_conversion(options)

    results = convert_images(filenames, options.target_dir, options.format)

    report_results(options.target_dir, results)

  {:error, error} ->
    report_error(error)
end

While that does not look overly complex, the same function from our first article looked like this:

argv
|> parse_options()
|> validate_options()
|> prepare_conversion()
|> convert_images()
|> report_results()

Let’s try to gain back some of that clarity …

One Token to Rule Them All!

We can gain back clarity by using a single token for the fulfilment of our use-case from start to finish. This is what Plug does with Plug.Conn: each request and its response are represented as a single token, which accompanies the whole business process of answering a web request, from getting the original request all the way to sending out the response.

What would this look like for our example?

We are getting a “request” to convert images in a given directory to a given format and answer this “request” by returning the converted images and printing their names on the terminal (or presenting an error message if the “request” was malformed).

We will call our new struct Token and let it flow through our program (green tasks):

defmodule Converter.Token do
  defstruct argv: nil,
            glob: nil,
            target_dir: nil,
            format: nil,
            errors: nil,
            halted: nil,
            results: nil
end

Now we can pass a Token in at the “top of the pipe” in our run/1 function.

defmodule Mix.Tasks.ConvertImages do
  use Mix.Task

  alias Converter.Token

  @default_glob "./image_uploads/*"
  @default_target_dir "./tmp"
  @default_format "jpg"

  def run(argv) do
    %Token{argv: argv}
    |> parse_options()
    |> validate_options()
    |> prepare_conversion()
    |> convert_images()
    |> report_results()
  end

  # Each stage of the conversion process takes the `Token` as argument ...
  defp parse_options(%Token{argv: argv} = token) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || @default_glob
    target_dir = opts[:target_dir] || @default_target_dir
    format = opts[:format] || @default_format

    %Token{token | glob: glob, target_dir: target_dir, format: format}
  end

  # ... we pattern match on the fields that are relevant to the current step  ...
  defp validate_options(%Token{filenames: filenames, format: format} = token) do
    errors = [
      if(Enum.empty?(filenames), do: "No images found."),
      if(!Enum.member?(~w[jpg png], format), do: "Unrecognized format: #{format}")
    ]

    %Token{token | errors: errors, halted: Enum.any?(errors)}
  end

  # ... we skip steps by matching on `halted: true`  ...
  defp prepare_conversion(%Token{halted: true} = token), do: token

  # ... we put in new information gathered at the current stage  ...
  defp prepare_conversion(%Token{target_dir: target_dir} = token) do
    File.mkdir_p!(target_dir)
    filenames = Path.wildcard(glob)

    %Token{token | filenames: filenames}
  end

  # ... we can skip steps by matching on `halted: true` ...
  defp convert_images(%Token{halted: true} = token), do: token

  # ... also, we don't have to pattern match on the `Token` necessarily ...
  defp convert_images(token) do
    results =
      Enum.map(token.filenames, fn filename ->
        Converter.convert_image(filename, token.target_dir, token.format)
      end)

    %Token{token | results: results}
  end

  # ... and at the end we can report errors by matching on `halted: true`  ...
  defp report_results(%Token{halted: true, errors: errors} = token) do
    Enum.each(errors, fn error ->
      IO.puts("- #{error}")
    end)

    token
  end

  # ... or report the results of the success program execution.
  defp report_results(token) do
    IO.puts("Wrote #{Enum.count(token.results)} images to #{token.target_dir}.")

    token
  end
end

Our new Token interacts very much like Plug.Conn does: It is handed down from function to function during the execution of our business process.

Let’s summarize the properties of this approach:

  1. Each step of the program’s execution is a function, which takes the Token struct as a first argument and we pattern match on the fields that are relevant to the current step.
  2. We can easily skip steps by matching on halted: true.
  3. We can put in new information gathered at the current stage by modifying the Token before returning it.
  4. At the end, we can report errors by matching on halted: true or report the results of the successful program execution.

Comparison of the three approaches

Let’s look at the properties of each approach once again:

# Approach 1: using `|>`
def run(argv) do
  argv
  |> parse_options()
  |> validate_options()
  |> prepare_conversion()
  |> convert_images()
  |> report_results()
end

The |> approach forces the code to adopt Elixir’s idiomatic style of putting the to-be-transformed data as the first argument in any function. This provides a kind of natural “interface” or “contract”.

# Approach 2: using `with`
def run(argv) do
  with {glob, target_dir, format} <- parse_options(argv),
        :ok <- validate_options(glob, format),
        filenames <- prepare_conversion(glob, target_dir),
        results <- convert_images(filenames, target_dir, format) do
    report_results(results, target_dir)
  end
end

The with approach shines when collaborating in a fast-moving environment: you might not want to be that dependent on another programmer’s return values early on and with provides more flexibility in dealing with the called function’s result.

# Approach 3: using a `Token`
def run(argv) do
  %Token{argv: argv}
  |> parse_options()
  |> validate_options()
  |> prepare_conversion()
  |> convert_images()
  |> report_results()
end

Finally, the Token approach can combine the benefits of both worlds, if applied in the right place: If you have a well defined, narrow use-case, where you want to provide a unified interface and data structure (like Ecto.Changeset), you might find this approach benefical. It is also a great idea for the most top-level flow of your program, where your use-case is the very purpose of your program and you might want to establish an explicit data contract between different interacting parts of your app (like Plug.Conn does for web apps).

In these situations, we can can combine the benefits of the first two approaches, since it provides a binding contract between all parts of your app, but also allows teams to work independently.

We can also do “flowy things” like skipping steps by matching on the Token:

# skip this step since execution is halted
defp prepare_conversion(%Token{halted: true} = token) do
  token
end

# fulfil this step since execution is not halted
defp prepare_conversion(%Token{target_dir: target_dir} = token) do
  File.mkdir_p!(target_dir)

  token
end

We will take a more detailed look at these “flowy things” as well as the Pros and Cons of the Token approach in future articles.

Your turn: Liked this post? Retweet this post! 👍