Search code examples
kdb+k

Parallel execution: blocking receive, deferred synchronous


I've asked a question about errors that happened while parallel sync and async calls. And answer shed light on an even bigger questions:

  • Does blocking receive construct replaces .z.ps/.z.pg calls?
  • If there exists deferred synchronous (used in mserve.q), are there something like deferred asynchronous exists?

My observations based on the previous question. Case 3 from that question is ok:

q)neg[h]({neg[.z.w] x};42); h[]
42

But what if we want to ensure that our message have been sent:

q)neg[h]({neg[.z.w] x};42); neg[h][]; h[]
42

Seems ok, right? If we go further on the documentation we find out that there another type of insurance we have: h"" - message processed on remote, and in this case we've got an error:

q)neg[h]({neg[.z.w] x};42); neg[h][]; h""; h[]
'type
<hangs>

So the proposition is the following - h[] (sent in the appropriate sequence) somehow changes the behaviour of a sender and may be a receiver process to prepare them for such communication.


Solution

  • To answer your first question, I don't think "replace" is correct term, rather the incoming message is expected as it was initiated by the local process, therefore it's not routed towards the .z.ps handler, unlike messages which the process wasn't expecting, where .z.ps can be used to ensure the message isn't unfriendly or whatever the case may be.

    When you issue a blocking receive, the O_NONBLOCK flag is cleared and recvfrom() blocks until a message arrives & the O_NONBLOCK flag is replaced

    read(0, "h[]\n", 4080)                  = 4
    fcntl(4, F_GETFL)                       = 0x802 (flags O_RDWR|O_NONBLOCK)
    fcntl(4, F_SETFL, O_RDONLY)             = 0
    recvfrom(4,
    
    
    "\1\0\0\0\25\0\0\0", 8, 0, NULL, NULL) = 8
    recvfrom(4, "\n\0\7\0\0\0unblock", 13, 0, NULL, NULL) = 13
    fcntl(4, F_GETFL)                       = 0x2 (flags O_RDWR)
    fcntl(4, F_SETFL, O_RDONLY|O_NONBLOCK)  = 0
    
    
    

    On your second question, I believe deferred synchronous was introduced in kdb+ v2.3 for the scenario where a client process shouldn't block the remote process while it waits for it's response. Deferred synchronous allows the server to process other client requests, while your client process blocks until the requested info is received. This is fine when the client can't do anything else until it receives the response.

    There are cases where neither process should wait for the other - is this what you're referring to? If so then a use case might be something like a tiered gateway system, where one or more gateways send/receive messages to/from each other, but none block or wait. This is done via async callbacks. In a complex system with multiple processes, each request needs to be tagged with an ID while they are inflight so as to track them. Likewise, you would need to track which request came from which connection so as to return results to the correct client.

    Here is a simpler example

    ////////////// PROC A //////////////
    q)\p
    1234i
    q)remoteFunc:{system"sleep 4";neg[.z.w](`clientCallback;x+y)}
    
    ////////////// PROC B //////////////
    q)h:hopen 1234
    q)clientCallback:{0N!x;}; .z.ts:{-1"Processing continues..";}
    q)
    q)neg[h](`remoteFunc;45;55);system"t 1000"
    q)Processing continues..
    Processing continues..
    Processing continues..
    Processing continues..
    Processing continues..
    100
    
    // process A sent back it's result when it was ready
    

    On your last question

    1. neg[h][] flushes async messages as least as far as tcp/ip. This does not mean the remote has received them. The chaser h"" flushes any outgoing messages on h, sends it's own request & processes all other messages on h, until it receives it's response.

    2. Chasing async messages is a way to ensure they've been processed on the remote before moving onto the next async message. In your example, the chase followed by a hanging call isn't valid, for one it will error & secondly, it's not a task which requires a guarantee that the previous async message was fully processed before commencing.

    Jason