Understanding Elixir Tasks

So, what are tasks,
Tasks are processes meant to execute one particular action throughout their lifetime, often with little or no communication with other processes. The most common use case for tasks is to convert sequential code into concurrent code by computing a value asynchronously:

In Elixir, there are several ways of running multiple tasks in a concurrent nature so that the time taken to process all the tasks is a long as the longest task in the list.
These methods are:

  • Manual ways
  • Task
  • Task.Supervisor

Github example we will be using.

Creating the task to use



To explain the above concepts, use this task that fetches weather temperatures of different locations. To learn how to make Http request in elixir checkout my previous article, elixir-working-with-json-api
defmodule Tasker.Worker do

  require Logger
  @moduledoc """
  This module will contact the weather API and do return the temperature.
  As long as it is provided the correct
  """

  @doc """
  This function to receive messages send and fetch the city's temperature
  """
  def loop do
    receive do
      {coordinator_pid, city} ->
        data = temperature_of(city)

        case data do
          {:ok, _city, temp} ->
            send(coordinator_pid, {:ok, "#{city} #{temp}"})

          {:error, _city, _reason} ->
            send(coordinator_pid, {:ok, "#{city} temparature"})
        end
    end
  end

  @doc """
    get a single city temperature.
  """
  def temperature_of(city) do
    create_url(city)
    |> HTTPoison.get()
    |> parse_response(city)
  end

  defp create_url(city) do
    "http://api.openweathermap.org/data/2.5/weather?q=#{city}&appid=#{api_key()}"
  end

  defp api_key do
    "c5bcd52c435ecaf5f659ccac8d3c1311"
  end

  def parse_response({:error, _}, city) do
    {:error, city, "[:ERROR:] check your internet connection and retry"}
  end

  def parse_response({:ok, %HTTPoison.Response{body: body, status_code: 200}}, city) do
    body
    |> JSON.decode!()
    |> compute_temperature(city)
  end

  def parse_response({:ok, %HTTPoison.Response{body: body, status_code: _}}, city) do
    body
    |> JSON.decode!()
    |> decode_error(city)
  end

  defp compute_temperature(json, city) do
    try do
      temp =
        (json["main"]["temp"] - 273.15)
        |> Float.round(1)

      {:ok,city, temp}
    rescue
      _ ->
        {:error, city, "[:ERROR:] temparature cant be decoded"}
    end
  end

  defp decode_error(reason, city) do
    {:error, city, "[:ERROR:] #{reason["message"]}"}
  end
end
This is the module that fetches temperatures from any valid city. Since am not teaching how to get data from the web using elixir, I suggest looking at my previous blog post that describes how to fetch JSON and decode it.

Manual ways



You can achieve concurrency in elixir in many ways, one of the ways is to use a coordinator process that will be sent the result of processes that are carrying out the task and then it should send the main process the aggregate result.

i.e Let translate the above diagram to code for better understanding.
defmodule Tasker.Coordinator do
  require Logger

  @moduledoc """
  This module is used to intercept the results of worker processes
  """

  def loop(state \\ [], city_count, owner) do
    receive do
      {:ok, response} ->
        new_state = [response | state]

        if(city_count == Enum.count(new_state)) do
          send(self(), :exit)
        end

        loop(new_state, city_count, owner)

      :exit ->
        result =
          state
          |> Enum.sort()
          |> Enum.join(", ")

        send(owner, {:ok, result})

      _ ->
        loop(state, city_count, owner)
    end
  end
end
The coordinator process is receiving the result send and adding it to a list, then waits for the next result. When the result count is equal to the number of tasks it sends an :exit message to itself and then sends the list of results to a given process PID (owner process).

The function using the coordinator process looks like this.
def master_cordinator(cities) when is_list(cities) do
    cordinator = spawn(Coordinator, :loop, [[], Enum.count(cities), self()])

    cities
    |> Enum.each(fn city ->
      pid = spawn(Worker, :loop, [])
      send(pid, {cordinator, city})
    end)

    # receive final results
    receive do
      {:ok, data} ->
        data
    after
      5000 ->
        IO.puts("time out")
    end
  end
In the above function, we are spawning coordinator process and then passing it's PID to multiple processes that are fetching the temperature of various cities. After those processes get data from the Weather API, they will send that data to the coordinator process which it is maintained in a loop until all the expected task processes send data.

Using a coordinator process it very tedious but am sure you can find a good use for it. We can still fetch city temperatures concurrently using yet another method, where we spawn multiple tasks that will send data to our method and then write an equal number of receivers to receive the results from those tasks.

code.
 def manual_method(cities) when is_list(cities) do
    owner = self()

    results =
      cities
      |> Enum.map(fn city ->
        pid = spawn(Worker, :loop, [])
        send(pid, {owner, city})
      end)
      |> Enum.map(fn _pid ->
        receive do
          {:ok, temp} ->
            "#{temp}"

          {:error, result} ->
            "#{result}"
        end
      end)

    results |> present_results()
  end
You can come up with your own manual method of processing multiple tasks concurrently since Elixir makes that very simple by use process communication. The problem comes in when an error occurs or some unexpected behaviors occurs when carrying out those tasks resulting in loss of data. To make sure this does not happen Elixir provides two convenient modules that are specialized in working with tasks.

Task



This module is so easy to use and it involves writing very few lines of code. We use Task.async/1 to process the required task and then wait for the result. Task.await/2 is used to read the message sent by the task.

In my case, this is how I have implemented it in the example project.
def async(cities) when is_list(cities) do
    results =
      cities
      |> Enum.map(fn city ->
        Task.async(fn -> Worker.temperature_of(city) end)
      end)
      |> Enum.map(&Task.await(&1, :infinity))

    results
    |> Enum.map(fn value ->
      case value do
        {:ok, city, temp} ->
          "#{city} #{temp}"

        {:error, city, _reason} ->
          "#{city} temp not found"
      end
    end)
    |> present_results()
  end

If one of the tasks crashes it won't be restarted by default unless you supervise the Task and then change it restart strategy from the default temporary to any other strategy like :transient. You can read on how to achieve this in the Task documentation.

If you supervise the Task module you won't be able to get the returned results as before, if you okay with this you can go ahead and use it but if you want to see results from supervised tasks you need to use Task.Supervisor.

Task.Supervisor



The Task.Supervisor module allows developers to dynamically create multiple supervised tasks. This module is quite powerful since it contains all the features of the Task and extras like working with distributed nodes.

So, how do we create it?
First, let create a supervisor module to supervise the Task.Supervisor.
defmodule Tasker.Supervisor do
  use Supervisor

  def start_link(:ok) do
    Supervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    children = [
      supervisor(Task.Supervisor, [[name: Tasker.TaskSupervisor]])
    ]
    supervise(children, [strategy: :one_for_one])
  end
end
Note: we are naming our supervisor with a nonexistent module since the supervisor expect an atom and modules are atoms too.
Since we have created our supervisor lets make sure it is started after the application is booted up.
defmodule Tasker.Application do
  use Application

  def start(_type, _args) do
    Tasker.Supervisor.start_link(:ok)
  end
end
Now let add the application module to our mix.exs like this
  def application do
    [
      extra_applications: ....your apps,
      mod: {Tasker.Application, []}
    ]
  end
Now that we have wired everything up, let use the Task.Supervisor to fetch the temperature of various cities.
  def supervised_task(cities) when is_list(cities) do
    results =
      cities
      |> Enum.map(fn city ->
        Task.Supervisor.async(Tasker.TaskSupervisor, Worker, :temperature_of, [city])
      end)
      |> Enum.map(&Task.await(&1, :infinity))

    results
    |> Enum.map(fn value ->
      case value do
        {:ok, city, temp} ->
          "#{city} #{temp}"

        {:error, city, _reason} ->
          "#{city} temp not found"
      end
    end)
    |> present_results()
  end
In the above code, the most important part is Task.Supervisor.async(Tasker.TaskSupervisor, Worker, :temperature_of, [city]). It takes the name of the supervised Task, The worker mode, the function, and the expected parameters. After invoking the task module you just need to wait for the result using Task.await/1. That all it takes to supervised tasks.

Distributed Tasks



Now, what if you have multiple computers and you have very many tasks that you would like to distribute among them, how do you do that? No worry, you can connect them as nodes in the same network and then use Task.Supervisor to run the provided jobs/tasks in them remotely. To send a task to a particular node for processing you pass it as a parameter to the Task.Superisor like this.
   Task.Supervisor.async({Tasker.TaskSupervisor, node_in_network}, Worker_module, :function ,[parameter])
For better understanding here is the code from the GitHub example
  @doc """
  This function will spawn process in multiple nodes in a cluster
  """
  def task_distributed(cities) when is_list(cities) do
    master_node = :"master@127.0.0.1"
    slave_nodes = [:"s1@127.0.0.1", :"s2@127.0.0.1", :"s3@127.0.0.1"]

    # start the master node
    master_node |> Node.start()

    # connect to other nodes, make sure they are up
    # use different terminals
    slave_nodes |> Enum.each(&Node.connect/1)

    # now everything is started let create maths to give jobs
    # just simple maths to give jobs
    nodes = [node() | Node.list()]

    # Now let give each node a task untils all tasks are all given out.
    node_job_dispatcher(nodes, cities, [])
    |> Enum.map(&Task.await(&1, :infinity))
    |> Enum.map(fn value ->
      case value do
        {:ok, city, temp} ->
          "#{city} #{temp}"

        {:error, city, _reason} ->
          "#{city} temp not found"
      end
    end)
    |> present_results()
  end

  defp node_job_dispatcher(_nodes, [], comm), do: comm

  defp node_job_dispatcher([node_x | n_tail], [city | c_tail], comm) do
    task =
      Task.Supervisor.async(
        {Tasker.TaskSupervisor, node_x},
        Worker,
        :temperature_of,
        [city]
      )

    node_job_dispatcher(n_tail ++ [node_x], c_tail, [task | comm])
  end

  defp present_results(results) do
    results
    |> Enum.sort()
    |> Enum.join(", ")
  end
NOTE: Before testing this code, you need to start all the three slave nodes in their own terminals
eg
terminal:~$ iex --name s1@127.0.0.1 -S mix
After that, you can run the above code above normally.
ie.
terminal:~$ iex -S mix
iex> cities_list = ["Nairobi", "another city"]
iex > Tasker.task_distributed(cities_list)
In the above code, we first change the elixir node running the code to be master by giving it a name hence the use of Node.start/1 after that we connect to the slave nodes using Node.connect/1. After we connect all the node we can use Task.Supervisor to send a task to be processed by a particular node and then wait for the result.

Conclusion



The article is as a result of reading various books and elixir documentation, so some concept and examples may be familiar. My goal was to try and compile my knowledge and beautiful works of others and create a somehow complete guide to working with Elixir tasks.

If you need me to improve or have a question please write it as a comment.

Understanding Elixir Tasks Understanding Elixir Tasks Reviewed by John Invictus on 12:12 Rating: 5

No comments:

Powered by Blogger.