Search code examples
rustdnsrust-tokio

How do you wrap synchronous network I/O trivially with Tokio?


There is an evident lapse in my understanding on concurrent development in Rust unfortunately. This question stems from weeks repeated struggles to solve a seemingly "trivial" problem.


Problem Domain

Developing a Rust library, named twistrs that is a domain name permutation and enumeration library. The aim and objective of the library, is to be provide a root domain (e.g. google.com) and generate permutations of that domain (e.g. guugle.com) and enrichment that permutation (e.g. it resolves to 123.123.123.123).

One of its objectives, is to perform substantially faster than its Python counterpart. Most notably, network calls such as, but not limited to, DNS lookups.

Currently Design Proposal

The idea behind the library (apart from being a learning ground) is to develop a very trivial security library that can be implemented to meet various requirements. You (as a client) can choose to interact directly to the permutation or enrichment modules internally, or use the library provided async/concurrent implementation.

Twistrs proposed architecture

Note that there is no shared state internally. This is probably very inefficient, but somewhat meaningless for the time being as it prevents a lot of issues.

Current Problem

Internally the DNS lookup is done synchronously and blocks by nature. I'm having trouble turning this into concurrent code. The closest I could get was to use tokio mpsc channels, and perform spawn a single tokio task:

use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let domain = Domain::new("google.com").unwrap();

    let _permutations = domain.all().unwrap().collect::<Vec<String>>();

    let (mut tx, mut rx) = mpsc::channel(1000);

    tokio::spawn(async move {
        for (i, v) in _permutations.into_iter().enumerate() {
            let domain_metadata = DomainMetadata::new(v.clone());

            let dns_resolution = domain_metadata.dns_resolvable();

            if let Err(_) = tx.send((i, dns_resolution)).await {
                println!("receiver dropped");
                return;
            }
        }
    });

    while let Some(i) = rx.recv().await {
        println!("got: {:?}", i);
    }
}

That said, an astute reader will immediately notice that this blocks, and effectively runs the DNS lookups synchronously either way.

Trying to spawn a Tokio task within the for-loop is not possible, due to move being done on the tx (and tx not impl Copy):

for (i, v) in _permutations.into_iter().enumerate() {
    tokio::spawn(async move {
        let domain_metadata = DomainMetadata::new(v.clone());

        let dns_resolution = domain_metadata.dns_resolvable();

        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
    });
}

Removing the await ofcourse will result in nothing happening, as the spawned task needs to be polled. How would I effectively wrap all those synchronous calls into async tasks, that can run independently and eventually converge into a collection?

A similar Rust project I came across was batch_resolve, which does a tremendous job at this (!). However, I found the implementation to be exceptionally complicated for what I'm looking to achieve (maybe I'm wrong). Any help or insight to achieve this is greatly appreciated.

If you want a quick way to reproduce this, you can simply clone the project and update the examples/twistrs-cli/main.rs using the first code snippet in this post.


Solution

  • Edit: I misinterpreted your question and didn't realize that the DNS resolution itself wasn't asynchronous. The following approach won't actually work with synchronous code and will just result in the executor stalling because of the blocking code, but I'll leave it up in case you switch to an asynchronous resolution method. I'd recommend using tokio's asynchronous lookup_host() if that fits your needs.


    Async executors are designed to handle large numbers of parallel tasks, so you could try spawning a new task for every request, using a Semaphore to create an upper bound on the number of running tasks at once. The code for that might look like this:

    let (mut tx, mut rx) = mpsc::channel(1000);
    let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once
    
    for (i, v) in _permutations.into_iter().enumerate() {
        let domain_metadata = DomainMetadata::new(v.clone());
        let mut tx = tx.clone(); // every task will have its own copy of the sender
        let permit = semaphore.acquire_owned().await; // wait until we have a permit
    
        let dns_resolution = domain_metadata.dns_resolvable();
        tokio::spawn(async move {
            if let Err(_) = tx.send((i, dns_resolution)).await {
                println!("receiver dropped");
                return;
            }
            drop(permit); // explicitly release the permit, to make sure it was moved into this task
        }); // note: task spawn results and handle dropped here
    }
    
    while let Some(i) = rx.recv().await {
        println!("got: {:?}", i);
    }
    

    If the overhead of the extra tasks proves too significant, you can try instead combining the tasks into a single future, using facilities like FuturesUnordered from the futures crate. This allows you to take an arbitrarily large list of futures and poll them all repeatedly within a single task.