Search code examples
erlangcometyawsdata-stream

Data streaming using streamcontent_from_pid in Yaws/Erlang


I desire to stream data with yaws to my comet application, I have read and worked around to understand it but the example from yaws seems to be a little complicated for me (I am new to Erlang). I just cannot get my head around...

here is the example from yaws (I modified a little bit):

out(A) ->
    %% Create a random number
    {_A1, A2, A3} = now(),
    random:seed(erlang:phash(node(), 1),
                erlang:phash(A2, A3),
                A3),
    Sz = random:uniform(1),

    Pid = spawn(fun() ->
                        %% Read random junk
                        S="Hello World",
                        P = open_port({spawn, S}, [binary,stream, eof]),
                        rec_loop(A#arg.clisock, P)
                end),

    [{header, {content_length, Sz}},
     {streamcontent_from_pid, "text/html; charset=utf-8", Pid}].


rec_loop(Sock, P) ->
    receive
        {discard, YawsPid} ->
            yaws_api:stream_process_end(Sock, YawsPid);
        {ok, YawsPid} ->
            rec_loop(Sock, YawsPid, P)
    end,
    port_close(P),
    exit(normal).

rec_loop(Sock, YawsPid, P) ->
    receive
        {P, {data, BinData}} ->
            yaws_api:stream_process_deliver(Sock, BinData),
            rec_loop(Sock, YawsPid, P);
        {P, eof} ->
            yaws_api:stream_process_end(Sock, YawsPid)
    end.

What I need is to transform the above script to which can be combined with the following.

mysql:start_link(p1, "127.0.0.1", "root", "azzkikr", "mydb"),
                {data, Results}  = mysql:fetch(p1, "SELECT*FROM messages WHERE id > " ++ LASTID),
                {mysql_result, FieldNames, FieldValues, NoneA, NoneB} = Results,
                parse_data(FieldValues, [], [], [], [], [])

Where parse_data(FieldValues, [], [], [], [], []) returns a JSON string of the entry.. Combined this script should constantly check for a new entry into database and if there is, it should fetch as comet should.

Thank you, May you all go to paradise!


Solution

  • As this answer explains, sometimes you need to have a process running that's independent of any incoming HTTP requests. For your case, you can use a form of publish/subscribe:

    • Publisher: when your Erlang node starts up, start some sort of database client process, or a pool of such processes, executing your query and running independently of Yaws.
    • Subscriber: when Yaws receives an HTTP request and dispatches it to your code, your code subscribes to the publisher. When the publisher sends data to the subscriber, the subscriber streams them back to the HTTP client.

    Detailing a full solution here is impractical, but the general steps are:

    • When your database client processes start, they register themselves into a pg2 group or something similar. Use something like poolboy instead of rolling your own process pools, as they're notoriously tricky to get right. Each database client can be an instance of a gen_server running a query, receiving database results, and also handling subscription request calls.
    • When your Yaws code receives a request, it looks up a database client publisher process and subscribes to it. Subscriptions require calling a function in the database client module, which in turn uses gen_server:call/2,3 to communicate with the actual gen_server publisher process. The subscriber uses Yaws streaming capabilities (or SSE or WebSocket) to complete the connection with the HTTP client and sends it any required response headers.
    • The publisher stores the process ID of the subscriber, and also establishes a monitor on the subscriber so it can clean up the subscription should the subscriber die or exit unexpectedly.
    • The publisher uses the monitor's reference as a unique ID in its messages it sends to that subscriber, so the subscription function returns that reference to the subscriber. The subscriber uses the reference to match incoming messages from the publisher.
    • When the publisher gets new query results from the database, it sends the data to each of its subscribers. This can be done with normal Erlang messages.
    • The subscriber uses Yaws streaming functions (or SSE or WebSocket features) to send query results to the HTTP client.
    • When an HTTP client disconnects, the subscriber calls another publisher function to unsubscribe.