Search code examples
elixirgen-server

How can i realise queue system in message broker on Elixir using genServer?


What methods i could use to create queues in Elixir using genServer? Main idea is to spawn create long running stream

def handle_call({:create_queue, queue_name}, _from, state) do

  create_queue(queue_name)
      {:reply, "You have successfully created a queue named #{queue_name}", state}

end

def create_queue(queue_name, list_of_subscribers \\ []) do

spawn fn -> create_queue_thread(list_of_subscribers, queue_name) end

end

I don't get how do i access queue only knowing it's name. I thought of returning it's PID but i have no clue where to store it properly and how to use it after. And i thought of broadcasting to all queue streams messages and pattern match it's name with touple that contain it's message.

I had relisation of message broker in gen_tcp and used super process that contain map of queue name and queue pid so i could use send function. But teacher said to me that "This is very bad" and i need to redo my task using genServer instead of gen_tcp and basic elixir concurrency methods.

def queue_cashier(queuing_process) do
    # IO.inspect(queuing_process)
    receive do
      { sender, queue_name } ->
        {queue_original,_} = Map.pop(queuing_process,queue_name)
        # IO.inspect(queuing_process)
        if queue_original == nil do
          # IO.puts("NIL")
          pid = spawn Server, :queue_handler, [%{}]
          queuing_process = Map.merge(queuing_process, %{queue_name => pid})
          # IO.inspect(queuing_process)
          # IO.inspect(pid)
          send sender, {pid }
          queue_cashier(queuing_process)
        else
          send sender, {queue_original }
          queue_cashier(queuing_process)
        end
        queue_cashier(queuing_process)
    end

    queue_cashier(queuing_process)
    end

Solution

  • But teacher said to me that "This is very bad" and i need to redo my task using genServer instead of gen_tcp and basic elixir concurrency methods.

    Hah hah. :)

    Well, the basic concurrency methods are spawn(), send(), and receive(). The mailbox for a process is a actually a FIFO implementation because a process handles messages in the order they are received, so just copy the source code for a process mailbox!

    I'm not quite sure what the details of your assignment are, but it sounds like the first thing you need to do is implement a queue. A GenServer itself is going to be your queue. A GenServer can be used to store data, like a list, and the list sort of becomes like a global variable that can be accessed from anywhere by calling handle_call()/handle_cast()--as long as the calling process has the pid of the GenServer.

    The problem with storing a list in the GenServer's state is that a list is essentially a LIFO implementation--that's because it's efficient to add things to the head of a list, which means the last thing added to the list is at the head of the list, and it's also very efficient to extract the head of a list, so with a list it's efficient to get the last thing added to a list out first. A naive implementation of a queue, which is a FIFO data structure, would be to just store things in a list in the GenServer's state, then when you need a value, inside handle_call(:get_next ...) you can reverse the list, then get the head of the reversed list, which provides a FIFO implementation.

    If the queue needs to store processes, then save the pids in the GenServer's list. If you need to retrieve all the pids in the GenServer's list to broadcast a message to every pid, then you can implement a handle_call(:get_all, ...) which can reverse the GenServer's list and return it. That way you can step through the list from head to tail, and send a message to all the pids in the FIFO order.

    I don't get how do i access queue only knowing it's name. I thought of returning it's PID but i have no clue where to store it properly and how to use it after.

    When you start a GenServer, it returns a pid. You can then pass the pid to a function that you define to interact with the GenServer to perform the operations you want to perform. You may define four or five functions or even other processes that interact with the GenServer, and you need to pass them the pid of the GenServer. Or, you can register the name of the GenServer, and hard code the name of the GenServer in your code.