
So, what are tasks,
Tasks are processes meant to execute one particular action throughout their lifetime, often with little or no communication with other processes. The most common use case for tasks is to convert sequential code into concurrent code by computing a value asynchronously:
In Elixir, there are several ways of running multiple tasks in a concurrent nature so that the time taken to process all the tasks is a long as the longest task in the list.
These methods are:
- Manual ways
- Task
- Task.Supervisor
Github example we will be using.
Creating the task to use
To explain the above concepts, use this task that fetches weather temperatures of different locations. To learn how to make Http request in elixir checkout my previous article, elixir-working-with-json-api
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | defmodule Tasker.Worker do require Logger @moduledoc "" " This module will contact the weather API and do return the temperature. As long as it is provided the correct "" " @doc "" " This function to receive messages send and fetch the city's temperature "" " def loop do receive do {coordinator_pid, city} -> data = temperature_of(city) case data do {:ok, _city, temp} -> send(coordinator_pid, {:ok, "#{city} #{temp}" }) {:error, _city, _reason} -> send(coordinator_pid, {:ok, "#{city} temparature" }) end end end @doc "" " get a single city temperature. "" " def temperature_of(city) do create_url(city) |> HTTPoison.get() |> parse_response(city) end defp create_url(city) do "http://api.openweathermap.org/data/2.5/weather?q=#{city}&appid=#{api_key()}" end defp api_key do "c5bcd52c435ecaf5f659ccac8d3c1311" end def parse_response({:error, _}, city) do {:error, city, "[:ERROR:] check your internet connection and retry" } end def parse_response({:ok, %HTTPoison.Response{body: body, status_code: 200}}, city) do body |> JSON.decode!() |> compute_temperature(city) end def parse_response({:ok, %HTTPoison.Response{body: body, status_code: _}}, city) do body |> JSON.decode!() |> decode_error(city) end defp compute_temperature(json, city) do try do temp = (json[ "main" ][ "temp" ] - 273.15) |> Float.round(1) {:ok,city, temp} rescue _ -> {:error, city, "[:ERROR:] temparature cant be decoded" } end end defp decode_error(reason, city) do {:error, city, "[:ERROR:] #{reason[" message "]}" } end end |
Manual ways
You can achieve concurrency in elixir in many ways, one of the ways is to use a coordinator process that will be sent the result of processes that are carrying out the task and then it should send the main process the aggregate result.
i.e

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | defmodule Tasker.Coordinator do require Logger @moduledoc "" " This module is used to intercept the results of worker processes "" " def loop(state \\ [], city_count, owner) do receive do {:ok, response} -> new_state = [response | state] if (city_count == Enum.count(new_state)) do send(self(), : exit ) end loop(new_state, city_count, owner) : exit -> result = state |> Enum. sort () |> Enum. join ( ", " ) send(owner, {:ok, result}) _ -> loop(state, city_count, owner) end end end |
The function using the coordinator process looks like this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | def master_cordinator(cities) when is_list(cities) do cordinator = spawn(Coordinator, :loop, [[], Enum.count(cities), self()]) cities |> Enum.each(fn city -> pid = spawn(Worker, :loop, []) send(pid, {cordinator, city}) end) # receive final results receive do {:ok, data} -> data after 5000 -> IO.puts( "time out" ) end end |
Using a coordinator process it very tedious but am sure you can find a good use for it. We can still fetch city temperatures concurrently using yet another method, where we spawn multiple tasks that will send data to our method and then write an equal number of receivers to receive the results from those tasks.
code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | def manual_method(cities) when is_list(cities) do owner = self() results = cities |> Enum.map(fn city -> pid = spawn(Worker, :loop, []) send(pid, {owner, city}) end) |> Enum.map(fn _pid -> receive do {:ok, temp} -> "#{temp}" {:error, result} -> "#{result}" end end) results |> present_results() end |
Task
This module is so easy to use and it involves writing very few lines of code. We use Task.async/1 to process the required task and then wait for the result. Task.await/2 is used to read the message sent by the task.
In my case, this is how I have implemented it in the example project.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def async(cities) when is_list(cities) do results = cities |> Enum.map(fn city -> Task.async(fn -> Worker.temperature_of(city) end) end) |> Enum.map(&Task.await(&1, :infinity)) results |> Enum.map(fn value -> case value do {:ok, city, temp} -> "#{city} #{temp}" {:error, city, _reason} -> "#{city} temp not found" end end) |> present_results() end |
If you supervise the Task module you won't be able to get the returned results as before, if you okay with this you can go ahead and use it but if you want to see results from supervised tasks you need to use Task.Supervisor.
Task.Supervisor
The Task.Supervisor module allows developers to dynamically create multiple supervised tasks. This module is quite powerful since it contains all the features of the Task and extras like working with distributed nodes.
So, how do we create it?
First, let create a supervisor module to supervise the Task.Supervisor.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | defmodule Tasker.Supervisor do use Supervisor def start_link(:ok) do Supervisor.start_link(__MODULE__, :ok) end def init(:ok) do children = [ supervisor(Task.Supervisor, [[name: Tasker.TaskSupervisor]]) ] supervise(children, [strategy: :one_for_one]) end end |
Since we have created our supervisor lets make sure it is started after the application is booted up.
1 2 3 4 5 6 7 | defmodule Tasker.Application do use Application def start(_type, _args) do Tasker.Supervisor.start_link(:ok) end end |
1 2 3 4 5 6 | def application do [ extra_applications: ....your apps, mod: {Tasker.Application, []} ] end |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def supervised_task(cities) when is_list(cities) do results = cities |> Enum.map(fn city -> Task.Supervisor.async(Tasker.TaskSupervisor, Worker, :temperature_of, [city]) end) |> Enum.map(&Task.await(&1, :infinity)) results |> Enum.map(fn value -> case value do {:ok, city, temp} -> "#{city} #{temp}" {:error, city, _reason} -> "#{city} temp not found" end end) |> present_results() end |
Distributed Tasks
Now, what if you have multiple computers and you have very many tasks that you would like to distribute among them, how do you do that? No worry, you can connect them as nodes in the same network and then use Task.Supervisor to run the provided jobs/tasks in them remotely. To send a task to a particular node for processing you pass it as a parameter to the Task.Superisor like this.
1 | Task.Supervisor.async({Tasker.TaskSupervisor, node_in_network}, Worker_module, : function ,[parameter]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | @doc "" " This function will spawn process in multiple nodes in a cluster "" " def task_distributed(cities) when is_list(cities) do master_node = : "master@127.0.0.1" slave_nodes = [: "s1@127.0.0.1" , : "s2@127.0.0.1" , : "s3@127.0.0.1" ] # start the master node master_node |> Node.start() # connect to other nodes, make sure they are up # use different terminals slave_nodes |> Enum.each(&Node.connect /1 ) # now everything is started let create maths to give jobs # just simple maths to give jobs nodes = [node() | Node.list()] # Now let give each node a task untils all tasks are all given out. node_job_dispatcher(nodes, cities, []) |> Enum.map(&Task.await(&1, :infinity)) |> Enum.map(fn value -> case value do {:ok, city, temp} -> "#{city} #{temp}" {:error, city, _reason} -> "#{city} temp not found" end end) |> present_results() end defp node_job_dispatcher(_nodes, [], comm ), do : comm defp node_job_dispatcher([node_x | n_tail], [city | c_tail], comm ) do task = Task.Supervisor.async( {Tasker.TaskSupervisor, node_x}, Worker, :temperature_of, [city] ) node_job_dispatcher(n_tail ++ [node_x], c_tail, [task | comm ]) end defp present_results(results) do results |> Enum. sort () |> Enum. join ( ", " ) end |
eg
1 | terminal:~$ iex --name s1@127.0.0.1 -S mix |
ie.
1 2 3 | terminal:~$ iex -S mix iex> cities_list = [ "Nairobi" , "another city" ] iex > Tasker.task_distributed(cities_list) |
Conclusion
The article is as a result of reading various books and elixir documentation, so some concept and examples may be familiar. My goal was to try and compile my knowledge and beautiful works of others and create a somehow complete guide to working with Elixir tasks.
If you need me to improve or have a question please write it as a comment.
Understanding Elixir Tasks
Reviewed by John Invictus
on
12:12
Rating:
