class: center, middle ![Indy Elixir](https://www.indyelixir.org/images/indyelixir.svg) # All the World’s a GenStage ### Steve Grossi @ Lessonly --- class: center, middle # Eager, Lazy, Concurrent, Distributed --- # Eager ```elixir File.read!("tmp/web.log") |> String.split("\n") |> Enum.filter fn (line) -> Regex.match?(@response_time_regex, line) end |> average_response_time() ``` - Simple to understand - Slow for large files - Not memory-efficient - Does not use concurrency --- # Lazy ```elixir File.stream!("tmp/web.log") |> Stream.filter fn (line) -> Regex.match?(@response_time_regex, line) end |> average_response_time() ``` - Efficient: item by item - Still single-process --- # Concurrent ```elixir File.stream!("tmp/web.log") |> Flow.from_enumerable |> Flow.filter fn (line) -> Regex.match?(@response_time_regex, line) end |> average_response_time() ``` - Multi-process concurrency - More complicated - Chronology can be trouble --- # Distributed ```elixir # Coming soon! ``` - not just multi-core, but multi-machine - The Erlang VM (BEAM) should make this pretty easy --- # GenStage - The mechanism behind `Flow` - A behavior for concurrent data processing pipelines - Sources, filters, sinks --- # GenStage - `producer`s gather/generate data - `consumer`s do things with data - `producer_consumer`s take data in, spit data out --- # Example Producer ```elixir defmodule MyApp.Counter do alias Experimental.GenStage use GenStage def start_link(start_at \\ 0) do GenStage.start_link(__MODULE__, start_at) end def init(state) do {:producer, state} end def handle_demand(demand, state) do # 0..4 => [0, 1, 2, 3, 4] # 5..9 => [5, 6, 7, 8, 9] => 5 events = Enum.to_list(state..state+demand-1) new_state = state + demand {:noreply, events, new_state} end end ``` --- # Example Consumer ```elixir defmodule MyApp.Printer do alias Experimental.GenStage use GenStage def start_link do # This happens not to take an initial argument GenStage.start_link(__MODULE__, :whatever) end def init(counter) do {:consumer, counter} end def handle_events(events, _from, state) do for event <- events do IO.inspect {self(), event} end {:noreply, [], state} end end ``` --- # Process Optimization ```elixir # lib/my_app.ex # thanks to `$ mix new . --sup` children = [ worker(MyApp.Counter, [0]), worker(MyApp.Printer, []) ] # lib/my_app/counter.ex def start_link(state) do GenStage.start_link(__MODULE__, state, name: __MODULE__) end # lib/my_app/printer.ex def init(counter) do {:consumer, counter, subscribe_to: [Counter]} end ``` --- # Concurrency! ```elixir # lib/my_app.ex children = [ worker(MyApp.Counter, [0]), worker(MyApp.Printer, [], id: 1), worker(MyApp.Printer, [], id: 2), worker(MyApp.Printer, [], id: 3), worker(MyApp.Printer, [], id: 4) ] ``` --- # Auto-Detect Available Cores ```elixir # lib/my_app.ex children = [ worker(MyApp.Counter, [0]), ] consumers = for id <- 1..System.schedulers_online do worker(MyApp.Printer, [], id: id) end Supervisor.start_link(children ++ consumers, opts) ``` --- # Dispatchers ```elixir # lib/counter.ex def init(state) do {:producer, state, dispatcher: GenStage.BroadcastDispatcher} end ``` results in: ```shell {#PID<0.184.0>, 7664} {#PID<0.183.0>, 7461} {#PID<0.185.0>, 7811} # Same! {#PID<0.186.0>, 7797} {#PID<0.183.0>, 7462} {#PID<0.185.0>, 7465} # Also same! {#PID<0.186.0>, 7798} {#PID<0.185.0>, 7813} {#PID<0.185.0>, 7811} # Same! {#PID<0.186.0>, 7800} {#PID<0.183.0>, 7465} # Also same! ``` --- # Producer-Consumers ```elixir defmodule MyApp.Multiplier do alias Experimental.GenStage use GenStage # worker(MyApp.Multiplier, [2]) def start_link(factor) do GenStage.start_link(__MODULE__, factor, name: __MODULE__) end def init(factor) do {:producer_consumer, factor, subscribe_to: [MyApp.Counter]} end def handle_events(events, _from, factor) do events = Enum.map(events, &(&1 * factor)) {:noreply, events, factor} end end ``` --- # Producer-Consumers ```elixir # lib/my_app/counter.ex def init(state) do {:producer, state} end # lib/my_app/multiplier.ex def init(factor) do {:producer_consumer, factor, subscribe_to: [MyApp.Counter]} end # lib/my_app/printer.ex def init(counter) do {:consumer, counter, subscribe_to: [MyApp.Multiplier]} end ``` --- # Let’s Build a Log Parser - reads a static log dump - extracts lines with response times - aggregates average response time --- # The Producer ```elixir defmodule Lumberjack.Reader do alias Experimental.GenStage use GenStage def start_link(log_path) do state = {0, log_path} GenStage.start_link(__MODULE__, state, name: __MODULE__) end def init({counter, log_path}) do log = File.stream!(log_path) {:producer, {counter, log}} end def handle_demand(demand, {counter, log}) do lines = log |> Stream.drop(counter) |> Enum.take(demand) {:noreply, lines, {counter + demand, log}} end end ``` --- # The Producer-Consumer ```elixir defmodule Lumberjack.Filter do alias Experimental.GenStage use GenStage def start_link do GenStage.start_link(__MODULE__, :whatever, name: __MODULE__) end def init(state) do {:producer_consumer, state, subscribe_to: [Lumberjack.Reader]} end def handle_events(events, _from, state) do filtered_events = events |> Enum.map(&extract_metric(&1)) |> Enum.reject(&is_nil(&1)) {:noreply, filtered_events, state} end # continued... end ``` --- # The Producer-Consumer ```elixir defmodule Lumberjack.Filter do # ... defp extract_metric(line) do detect_match(Regex.run(~r/Completed 200 OK in (\d+)ms/, line)) end defp detect_match([_line, match]), do: String.to_integer(match) defp detect_match(_), do: nil end ``` --- # The Consumer ```elixir defmodule Lumberjack.Aggregator do alias Experimental.GenStage use GenStage def start_link do GenStage.start_link(__MODULE__, []) end def init(state) do {:consumer, state, subscribe_to: [Lumberjack.Filter]} end def handle_events(metrics, _from, state) do new_state = state ++ metrics IO.puts("Average: #{average(new_state)}") {:noreply, [], new_state} end defp average(list) when is_list(list) do Enum.sum(list) / length(list) end end ``` --- # Additional Links - [lumberjack on Github](https://github.com/stevegrossi/lumberjack) (the example repo from this talk) ## Elsewhere - [José Valim’s ElixirConf 2016 keynote](https://www.youtube.com/watch?v=srtMWzyqdp8) on GenStage, among other things - [José builds a DB-backed background-job processor in Elixir using GenStage](https://www.youtube.com/watch?v=aZuY5-2lwW4) at Elixir London - [GenStage on Github](https://github.com/elixir-lang/gen_stage) - [The GenStage docs](https://hexdocs.pm/gen_stage/Experimental.GenStage.html) (quite good!)