I am a bit new to the Erlang Environment
I am writing an email testing application that filters incoming email with a randomly generated routing_keys on a topic exchange to make emails entering my system
Once they are delivered (and processed) on a queue, I want to label them again with the previously randomly routing_key to route them to another exchange to make them ready for the final consume.
This 2nd producing step is causing me real troubles
I am getting data back from a tcp socket (processed by a third-tier program: spamassassin) with handle_info pattern matching
I rely on a gen_server to consume messages first through the regular amqp_client/include/amqp_client.hrl Library
I use handle_info in my gen_server behaviour and then pattern match on the parameters.
Detecting delivered AMQP message is done through function heads (records) in handle_info callback
TCP socket is nice to talk with spamassassin, it returns me a 3-tuple with binary string data like that:
{tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam: True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug 2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level: *******\nX-Spam-Status: Yes, score=7.9 required=5.0 tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version: 1.0\nContent-Type: multipart/mixed; boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>}
The loop in the second handle_info match OK the answer from the listening gen_tcp server, but I have to do the packaging to send it to a topic Exchange (topic_scored_email exchange)
***My gen_server****
handle_info({#'basic.deliver'{routing_key=Key, consumer_tag=Tag}, Content}, State) ->
#amqp_msg{props = Properties, payload = Payload} = Content,
#'P_basic'{message_id = MessageId, headers = Headers} = Properties,
send_to_spamassassin:calcule_score(Payload),
{noreply, State};
handle_info(Msg, State) ->
case Msg of
{_,_,Data} ->
scored_email:main(Data);
{_,_} ->
end,
{noreply, State}.
***send_to_spamassassin function ***
calcule_score(Message) ->
case gen_tcp:connect("localhost", 783, [{mode, binary}]) of
{ok, Sock} ->
…
gen_tcp:send(Sock, Message2);
{error,_} ->
io:fwrite("Connection error! Quitting...~n")
end.
***scored_email***
main(Argv) ->
{ok, Connection} = amqp_connection:start(#amqp_params_network{virtual_host = <<"/">>}),
{ok, Channel} = amqp_connection:open_channel(Connection),
amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_scored_email">>,type = <<"topic">>}),
{RoutingKey, Message} = case Argv of
…
%DOING PATTERN MATCHING THAT WORKS HERE
…
end,
amqp_channel:cast(Channel,#'basic.publish'{exchange = <<"topic_scored_email">>,routing_key = RoutingKey},#amqp_msg{payload = Message})
The first issue is type of the data (binary string) but I guess it can be a workaround using BIF binary_to_tuple or stuff like that.
What I struggle to understand is how I could pass the right RoutingKey, since Erlang is functional, there is no side effect or assignation.
That change in format data (AMQP --> raw TCP --> then AMQP again) seems impossible (to me) to achieve with OTP abstraction
However, I would like to reassemble every processed message with the right routing key matched 5 lines above.
How could I modify my code, to do that? I come from imperative language and reach my limit here…
Yours
The first issue is type of the data (binary string) but I guess it can be a workaround using BIF binary_to_tuple or stuff like that.
In all languages, you have to figure out how to parse the data you read from a socket.
What I struggle to understand is how I could pass the right RoutingKey, since Erlang is functional, there is no side effect or assignation.
That is the party line, but in reality the parameter variables of a recursive function can be used to store values. In your case, you can store the routing key in the State
variable, which is then available in all gen_server callback functions. State
can be a 30 element tuple if you want, so there is no limit to how much info you can store in the State
variable.
Another option is to use an ets/dets table, i.e. an erlang database, to store messages with routing keys until you are ready to send ?everything? to some other process.
{RoutingKey, Message} = ...
However, I would like to reassemble every processed message with the right routing key matched 5 lines above.
If you are within same function, what prevents you from using the routing key and message that you have in the variables RoutingKey
and Message
? I'm unclear how there is an issue if all the code is within one function. I would think you could do something like this:
{RoutingKey, Message} = ...
ProcessedMsg = process_this(Message)
{RoutingKey, ProcessedMsg}
I suggest that you post a simple example of your problem--without all the complex matching and amqp_channel stuff to distill the problem down to its core, e.g.
handle_info(Msg, State) ->
RoutingKey = 3,
ProcessedMsg = "hello",
%% Here, I want to write: ....