1

Trying to apply Task.Supervisor.async_stream_nolink with little test, but not working..

Here is my implementation:

defmodule MySupervisor do
    use Supervisor

    def start_link do
        Supervisor.start_link(__MODULE__, [])
    end

    def init([]) do
        children= [
            worker(Basic, [])
        ]

        supervise(children, strategy: :one_for_one)
    end
end

defmodule Basic do
    use GenServer

    def start_link do
        GenServer.start_link(__MODULE__, [], name: __MODULE__)
    end
end


strings = ["long string", "longer string", "there are many of these"]


stream = Task.Supervisor.async_stream_nolink(MySupervisor, strings, fn text -> text |> String.codepoints |> Enum.count end)


Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end) |> IO.puts

But, I got error:

** (EXIT from #PID<0.70.0>) exited in: GenServer.call(MySupervisor, {:start_child, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.118488666 in file:test.exs>, ["long st ring"]]}]}, :infinity) ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

16:24:34.317 [error] GenServer #PID<0.79.0> terminating ** (stop) exited in: GenServer.call(MySupervisor, {:start_child, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.118488666 in file:test.exs>, ["long string"]]}]}, :infin ity) ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started Last message: {:EXIT, #PID<0.70.0>, {:noproc, {GenServer, :call, [MySupervisor, {:start_child, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.118488666 in file:test.exs

, ["long string"]]}]}, :infinity]}}} State: {:state, {#PID<0.79.0>, MySupervisor}, :one_for_one, [{:child, #PID<0.80.0>, Basic, {Basic, :start_link, []}, :permanent, 5000, :worker, [Basic]}], :undefined, 3, 5, [], 0, MySupervisor, []}

Anyone can help? I could not find an example about using Task.Supervisor.async_stream_nolink

EDIT

Trying to use my own supervisor directly without using it in application, I tried this:

{:ok, supervisor} = MySupervisor.start_link
stream = Task.Supervisor.async_stream_nolink(supervisor, strings, fn text -> text |> String.codepoints |> Enum.count end)
Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end) |> IO.puts

But I got error:

** (EXIT from #PID<0.70.0>) an exception was raised: ** (MatchError) no match of right hand side value: {:error, {:EXIT, {:undef, [{Basic, :start_link, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.20003547 in file:t est.exs>, ["long string"]]}], []}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 381]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 406]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}} (elixir) lib/task/supervisor.ex:265: anonymous fn/4 in Task.Supervisor.build_stream/5 (elixir) lib/task/supervised.ex:332: Task.Supervised.stream_monitor/6

17:07:53.497 [error] Process #PID<0.81.0> raised an exception ** (MatchError) no match of right hand side value: {:error, {:EXIT, {:undef, [{Basic, :start_link, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.20003547 in file:test. exs>, ["long string"]]}], []}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 381]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 406]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}} (elixir) lib/task/supervisor.ex:265: anonymous fn/4 in Task.Supervisor.build_stream/5 (elixir) lib/task/supervised.ex:332: Task.Supervised.stream_monitor/6

17:07:53.503 [error] GenServer MySupervisor terminating ** (MatchError) no match of right hand side value: {:error, {:EXIT, {:undef, [{Basic, :start_link, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.20003547 in file:test. exs>, ["long string"]]}], []}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 381]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 406]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}} (elixir) lib/task/supervisor.ex:265: anonymous fn/4 in Task.Supervisor.build_stream/5 (elixir) lib/task/supervised.ex:332: Task.Supervised.stream_monitor/6 Last message: {:EXIT, #PID<0.70.0>, {{:badmatch, {:error, {:EXIT, {:undef, [{Basic, :start_link, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.20003547 in file:test.ex s>, ["long string"]]}], []}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 381]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 406]}, {:gen_server, :try_handle_call, 4, [f ile: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}}}, [{Task.Supervisor, :"-build_str eam/5-fun-0-", 4, [file: 'lib/task/supervisor.ex', line: 265]}, {Task.Supervised, :stream_monitor, 6, [file: 'lib/task/supervised.ex', line: 332]}]}} State: {:state, {:local, MySupervisor}, :simple_one_for_one, [{:child, :undefined, Basic, {Basic, :start_link, []}, :permanent, 5000, :worker, [Basic]}], :undefined, 3, 5, [], 0, MySupervisor, []}

17:07:53.505 [error] GenServer #PID<0.79.0> terminating ** (MatchError) no match of right hand side value: {:error, {:EXIT, {:undef, [{Basic, :start_link, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.20003547 in file:test. exs>, ["long string"]]}], []}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 381]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 406]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}} (elixir) lib/task/supervisor.ex:265: anonymous fn/4 in Task.Supervisor.build_stream/5 (elixir) lib/task/supervised.ex:332: Task.Supervised.stream_monitor/6 Last message: {:EXIT, #PID<0.70.0>, {{:badmatch, {:error, {:EXIT, {:undef, [{Basic, :start_link, [#PID<0.70.0>, :monitor, {:nonode@nohost, #PID<0.81.0>}, {:erlang, :apply, [#Function<0.20003547 in file:test.ex s>, ["long string"]]}], []}, {:supervisor, :do_start_child_i, 3, [file: 'supervisor.erl', line: 381]}, {:supervisor, :handle_call, 3, [file: 'supervisor.erl', line: 406]}, {:gen_server, :try_handle_call, 4, [f ile: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}}}, [{Task.Supervisor, :"-build_str eam/5-fun-0-", 4, [file: 'lib/task/supervisor.ex', line: 265]}, {Task.Supervised, :stream_monitor, 6, [file: 'lib/task/supervised.ex', line: 332]}]}} State: {:state, {#PID<0.79.0>, Supervisor.Default}, :simple_one_for_one, [{:child, :undefined, Task.Supervised, {Task.Supervised, :start_link, []}, :temporary, 5000, :worker, [Task.Supervised]}], :undefined, 3 , 5, [], 0, Supervisor.Default, {:ok, {{:simple_one_for_one, 3, 5}, [{Task.Supervised, {Task.Supervised, :start_link, []}, :temporary, 5000, :worker, [Task.Supervised]}]}}}

1 Answer 1

5

The functions in Task.Supervisor are meant to be run only on a Supervisor started with Task.Supervisor.start_link/1.

You can either start an unnamed instance and pass the pid to async_stream_nolink/4:

{:ok, supervisor} = Task.Supervisor.start_link
stream = Task.Supervisor.async_stream_nolink(supervisor, strings, fn text -> text |> String.codepoints |> Enum.count end)

or start a named instance and pass its name:

Task.Supervisor.start_link name: MySupervisor
stream = Task.Supervisor.async_stream_nolink(MySupervisor, strings, fn text -> text |> String.codepoints |> Enum.count end)

The output in both cases is 47.

If you want this supervisor to run alongside your application, you can add it under your application's existing top level supervisor like this:

children = [
  supervisor(Task.Supervisor, [name: MySupervisor]),
  ...
]

and then do

Task.Supervisor.async_stream_nolink(MySupervisor, ...)
Sign up to request clarification or add additional context in comments.

5 Comments

ops! I've edited your answer instead of my question!
I think you may have misread my answer -- even when you pass a pid, you need to pass a pid of a Task Supervisor (Task.Supervisor.start_link, not MySupervisor.start_link).
So, how would I customize it? like for example, the strategy or grap the process that has failed..
There are some options Task.Supervisor.start_link/1 accepts that are documented here: hexdocs.pm/elixir/Task.Supervisor.html#start_link/1.
Hi Dogbert, can you please check this question at elixir forum: elixirforum.com/t/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.