7

In the Task.async_stream options, the :timeout parameter is described:

The maximum amount of time (in milliseconds) each task is allowed to execute for. Defaults to 5000

In my testing I did the following:

iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
[ok: 10, ok: 4, ok: 5]

iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list      
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1

How come the first example does not timeout (but takes ~10 seconds to execute) while the second example exhibits the expected behaviour of timing out?

4
  • 1
    I'm getting a timeout for both the code snippets. Commented Aug 24, 2017 at 16:04
  • Same here - times out for me. Which version of Elixir do you use? Commented Aug 24, 2017 at 16:23
  • Im using version 1.4.5 Commented Aug 24, 2017 at 17:21
  • I don't have an install of 1.4.5 to test but on 1.5.1 this definitely consistently times out as expected. Commented Aug 24, 2017 at 20:26

1 Answer 1

6

The implementation of Task.async_stream changed from 1.4.5 to 1.5.1.

Let's take a look at what happens.

Elixir 1.4.5

In this version the timeout is part of a receives after block.

receive do
  {{^monitor_ref, position}, value} ->
    # ...

  {:down, {^monitor_ref, position}, reason} ->
    # ...

  {:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
    # ...
after
  timeout ->
    # ...
end

This receive block serves the purpose to wait for update messages from the spawned tasks sent from a monitoring process. For simplicity reasons I truncated the code.

What does that mean in an applied scenario? Task.async_stream will only timeout if there is a duration of timeout milliseconds in which it receives no message from a spawned task.

Example

Lets try your example using [10, 3, 4]:

iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1

As we can see this results in a timeout, as expected.

Now what if we try to use [10, 5], will this work?

iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1

As it seems the initial Task takes too long with it's 5 seconds timeout. But as soon as we add an intermediary step, it works. How about 1?

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
[ok: 10, ok: 5, ok: 1]

Elixir 1.5.1

In Elixir 1.5.1 the timeout logic works differently. It uses Process.send_after to send a timeout message for each spawned Task to the monitoring process.

# Schedule a timeout message to ourselves, unless the timeout was set to :infinity
timer_ref = case timeout do
  :infinity -> nil
  timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout)
end

This message is then handled in the same receive which spawned the Task and sent the :timeout message.

Link to the full function.

Examples

As soon as a single process takes longer than the specified timeout, the whole stream goes to it's knees, as it should be.

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7
    (elixir) lib/enum.ex:1847: Enum.reverse/1
    (elixir) lib/enum.ex:2596: Enum.to_list/1

TL;DR

Elixir 1.4.5 tracks the timeout anew after receiving a result from a spawned process. Elixir 1.5.1 tracks it separately for each spawned process.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.