Search code examples
asynchronousluaiocoroutine

Refactor blocking I/O in Lua using coroutines


Say I have the following Lua code, which reads and parses data from a file.

local cjson = require 'cjson'

function read_data (path)
  local file = io.open(path, 'r')
  local raw = file.read('*all')
  return cjson.decode(raw)
end

function use_data (path)
  local data = read_data(path)
  -- do something with `data`
end

I'm now trying to figure out what the non-blocking version of this basic example would look like.

In Javascript, you'd make both of those functions asynchronous, and then await the results. But judging by what I've read so far about Lua, here I'd only have to refactor the read_data function to use coroutines, and I could leave the use_data code as-is. How would I achieve this?

EDIT: Solutions that replace the current calls with external libraries are welcome. Whatever it takes to make the example non-blocking, basically.


Solution

  • In Javascript, you'd make both of those functions asynchronous, and then await the results. But judging by what I've read so far about Lua, here I'd only have to refactor the read_data function to use coroutines, and I could leave the use_data code as-is. How would I achieve this?

    Lua has support for coroutines. There are a few taxonomies we could explore to classify different implementations of coroutines. One that matters a lot when we discuss Lua coroutines is that which classify them according to “can the coroutine suspend the caller?”.

    For this question, JS “coroutines” cannot suspend the caller. That's why every caller in the call stack must in turn await the return to propagate the suspension intent across the whole call-stack. Bob Nystrom describes this as the two colors problem.

    Lua is different than JS and many other languages in this regard. Lua coroutines can suspend the caller, so await is unnecessary. There are more details to this, but in short you're right to point that “I could leave the user_data code as-is”.

    However, all this is just syntax sugar at the language-level to help you organize your code. And for any syntax, we could come up with a solution. What really matters to solve your problem is a shared “event loop” (there are better terms for this concept, but that's the one that got popular and the one I'll use for now) to enable multitasking in your program.

    In JavaScript, the “event loop” is implicit to the execution context, and it spins every time your “code leaves” (e.g. the function returns, it reaches the end of the source file, etc).

    This “event loop” needs to coordinate tasks to switch between them. IOW, it needs to schedule tasks. So, scheduler could be another term to use here. In your case, the function that suspends also performs IO, so you need a scheduler that has IO integration. For IO integration, you want abstractions that forward IO requests to the OS, and integrates the consumption of the events from the notification channel (e.g. poll, epoll, select, io_uring, IOCP) with the scheduler. All IO events come multiplexed in the same channel, so upon receiving them, you demutiplex these events and dispatches them to the interested party. For this reason, this abstraction may also be termed as IO/event demuxer, or dispatcher. Scheduler is actually not a common term for this type of IO-oriented solution.

    Back to your problem, you're not only doing IO, but file IO. File IO is problematic to integrate in traditional reactors because reactors follow the readiness model, and file IO is always ready (there are way more details for this, but we can simplify it all and translate this statement as something in the likes of “your thread may block, and there's nothing we can do”). To circumvent the problem, thread pools may be used. So, now... the scheduler/IO-demux/event-dispatcher also deals with event consumption of sibling threads. There's a better name for this type of abstraction, and it's “execution engine”, so that's the name I'll stick with from now on (instead of “event loop”). I don't remember where I first saw this name, but it doesn't matter.

    Another way to solve the blocking file read would be to use proactors (e.g. io_uring, and IOCP). The term execution engine also abstracts this difference. We may refer to a framework as an execution engine whether it's implemented in terms of a reactor or a proactor, and it's not a problem.

    On a side-note, proactors are a better fit for async IO in general (not only for file IO). However that'd be another lengthy topic. I suggest you to just explore the frameworks that tackle the issue of concurrent IO and see how passionate their developers are about these topics. If they're passionate at all, you're surely to learn something new reading documentation, blogs, mailing lists, etc.

    And back to your problem, neither JS nor Lua have execution engines by themselves. On the browser, there is an implicit execution engine (but it certainly has no unlimited file IO access through a POSIX-like API). A common execution engine used outside the browser is NodeJS (which brands itself as an runtime environment which is a term that makes sense for NodeJS, and also a good choice).

    For Lua, there is no dominant execution engine as far as I'm aware. Many options, exist, such as cqueues and luvit. Some (like the execution engines we usually find in JS) are implicit, and others are explicit (i.e. you need to explicitly call API functions to “tick the event loop” and process the next round of ready events).

    EDIT: Solutions that replace the current calls with external libraries are welcome. Whatever it takes to make the example non-blocking, basically.

    You may see a comparison of reasonably updated libraries in this page: https://docs.emilua.org/api/0.10/tutorial/alternatives.html. Out of these, any library offering “fibers” (e.g. Emilua, and Tarantool), or “coroutines” (e.g. luvit), are likely to offer a solution that does what you asked for (the function read_data() would block current's fiber and context would switch to the next ready fiber, or block the thread until some event is ready).

    Do notice that blocking on a fiber is not the same as blocking the thread. Your program doesn't stall on IO, and concurrent IO is still possible on the same thread. There's an excellent talk form Ron Pressler on this topic that demystifies common misconceptions that people have: https://www.infoq.com/presentations/continuations-java/.

    If you decide to use Emilua, your program would be written in the likes of:

    local stream = require "stream"
    local file = require "file"
    
    -- `path` must be an instance of filesystem.path
    function read_data(path)
      local f = file.stream.new()
      f:open(path, {"read_only"})
      local buf = byte_span.new(f.size)
      stream.read_all(f, buf)
      return cjson.decode(tostring(buf))
    end
    

    stream.read_all() would perform multiple calls against f:read_some() until the buffer is fully filled. f:read_some() blocks the fiber until the result is ready, and other tasks in the system could keep running.

    To increase the concurrency level (i.e. to create a new task), just spawn another fiber before you perform IO (or any blocking call in general). For instance, you could add this at the start:

    local sleep = require "time".sleep
    
    spawn(function()
      print("3...")
      sleep(1)
      print("2...")
      sleep(1)
      print("1...")
      sleep(1)
      print("0!")
    end):detach()
    

    You'll usually call spawn() to create connection handlers for each accepted connection. Any new model may look intimidating for newcomers as now they need to learn new stuff, but fibers is not a new model. It's just thread vocabulary after all, which is quite established and stable. It's worth to learn serious concurrency models such as fibers anyway (other serious models such as Golang's CSP, and Erlang's actors, would be other options to invest on). For instance, compare the task of creating a pair of connected sockets in NodeJS (which pretty much has an ad-hoc concurrency model that keeps slowly mutating at each new release to cope with new problems):

    var net = require('net');
    
    function socket_pair(callback) {
        var result = {}
    
        var server = net.createServer(function(sock) {
            server.close()
            if (result.err) {
                return
            }
            if (result.sock) {
                callback(null, [ sock, result.sock ])
            } else {
                result.sock = sock
            }
        })
    
        server.on('error', function(err) {
            if (result.err) {
                return
            }
            result.err = err
            callback(err)
        })
    
        server.listen(0, '127.0.0.1', function() {
            var sock = new net.Socket()
            sock.connect(server.address().port, '127.0.0.1', function() {
                if (result.err) {
                    return
                }
                if (result.sock) {
                    callback(null, [ sock, result.sock ])
                } else {
                    result.sock = sock
                }
            })
            sock.on('error', function(err) {
                if (result.err) {
                    return
                }
                server.close()
                result.err = err
                callback(err)
            })
        });
    }
    

    Now notice how the solution becomes way simpler if you use fibers:

    local ip = require 'ip'
    
    function socket_pair()
        local acceptor = ip.tcp.listen('127.0.0.1:')
    
        local f = spawn(function()
            return ip.tcp.dial('127.0.0.1:' .. acceptor.local_port)
        end)
    
        local sock = acceptor:accept()
        acceptor:close()
        return sock, f:join()
    end