Search code examples
elixirproducer-consumerconsumerex-unitgenstage

How to test the Elixir GenStage Consumer?


I found some resources on how to test the producer, however there is nothing I could find which shows how to test the Consumer.

In producer, I create a dummy consumer and everything works fine, however in consumer I am struggling with testing.

defmodule DataProducer do
      use GenStage

      def start_link([]) do
        GenStage.start_link(__MODULE__, 0, name: __MODULE__)
      end

      # {:queue.new, demand, size}
      def init(counter) do
        {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
      end

      def handle_demand(demand, state) do 
        events = Enum.to_list(state..state + demand + 1)
        # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
        {:noreply, events, (state + demand)}
      end
    end

Producer Test:

 defmodule DataProducerTest do
      use ExUnit.Case

      test "check the results" do
        {:ok, stage} = DataProducer.start_link([])
        {:ok, _cons} = TestConsumer.start_link(stage)
        assert_receive {:received, events}
        GenStage.stop(stage)
      end

    end

    defmodule TestConsumer do
      def start_link(producer) do
        GenStage.start_link(__MODULE__, {producer, self()})
      end
      def init({producer, owner}) do
        {:consumer, owner, subscribe_to: [producer]}
      end
      def handle_events(events, _from, owner) do
        send(owner, {:received, events})
        {:noreply, [], owner}
      end
    end

And consumer:

defmodule DataConsumer do
  use GenStage
  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end
  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
  def handle_events(events, _from, state) do
    for event <- events do
      # :timer.sleep(250)
      Logger.info inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

Thank you in advanced.


Solution

  • In the test for the consumer:

     test "should behave like consumer" do
        {:ok, producer} = DummyProducer.start_link(1)
        {:ok, consumer} = Consumer.start_link(producer)
        Process.register self, :test
        assert_receive {:called_back, 10}
      end
    

    Now DummyProducer

    defmodule DummyProducer do
      use GenStage
    
      def start_link(demand) do
        GenStage.start_link(__MODULE__, demand)
      end
    
      def init(demand) do
        {:producer, demand}
      end
    
      def handle_demand(demand, counter) when demand > 0 do
        events = Enum.to_list(counter..counter+demand-1)
        Process.send_after(self(), {:stop, demand}, 1)
        {:noreply, events, demand + counter}
      end
    
      def handle_info({:stop, demand}, state) do
        send :test, {:called_back, demand}
        {:stop, :normal, demand}
      end
    end
    

    I think,

    The point of Testing the consumer is checking if consumer can send the demand and stick with max demand allocated in the subscription.