I wrap some functions of reqwest
of rust into req.lib
file and successfully call it from python by using cffi
. However reqwest::blocking::Client
force me to use multi-threading in python. I find reqwest
can be called in async mode in rust. I wonder is there a way to make req.lib
async? even semi-async is ok to me.
For example, currently the stub signature is:
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> *mut c_char
Can I write something like:
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> u64 // return request unique id
#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool // whether given request is done
#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *mut c_char // fetch response
Therefore cffi
calls do not lock main thread anymore. I can use single thread to invoke multiple requests. Any advice or best practice is welcome.
Asynchronous code is executed via special runtime, for python and rust these are the different and incompatible libraries. There you cannot simply share future between languages, it must be run in the same language where it have been created.
As for your example, that means you need to run a Client
in rust executor (e.g. in tokio) and then have a feedback from it. As the simplest way you can just create a global one:
use lazy_static::lazy_static;
use tokio::runtime::Runtime;
lazy_static! {
static ref RUNTIME: Runtime = Runtime::new().unwrap();
}
Then after spawning you need to have a feedback, so you may use a couple of maps with statuses and results:
use std::collections::HashMap;
use std::sync::RwLock;
use futures::prelude::*;
use tokio::sync::oneshot;
type FutureId = u64;
type UrlResult = reqwest::Result<String>;
type SyncMap<K, V> = RwLock<HashMap<K, V>>;
lazy_static! {
// Map for feedback channels. Once result is computed, it is stored at `RESULTS`
static ref STATUSES: SyncMap<FutureId, oneshot::Receiver<UrlResult>> = SyncMap::default();
// Cache storage for results
static ref RESULTS: SyncMap<FutureId, UrlResult> = SyncMap::default();
}
fn gen_unique_id() -> u64 { .. }
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> FutureId {
let url: &str = /* convert url */;
let (tx, rx) = oneshot::channel();
RUNTIME.spawn(async move {
let body = reqwest::get(url).and_then(|b| b.text()).await;
tx.send(body).unwrap(); // <- this one should be handled somehow
});
let id = gen_unique_id();
STATUSES.write().unwrap().insert(id, rx);
id
}
Here, for each urlopen
request oneshot::channel
is being created, which delays an execution result. So it is possible to check whether it is finished or not:
#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool {
// first check in cache
if RESULTS.read().unwrap().contains_key(&req_id) {
true
} else {
let mut res = RESULTS.write().unwrap();
let mut statuses = STATUSES.write().unwrap();
// if nothing in cache, check the feedback channel
if let Some(rx) = statuses.get_mut(&req_id) {
let val = match rx.try_recv() {
Ok(val) => val,
Err(_) => {
// handle error somehow here
return true;
}
};
// and cache the result, if available
res.insert(req_id, val);
true
} else {
// Unknown request id
true
}
}
}
Then the fetching result is fairly trivial:
#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *const c_char {
let res = RESULTS.read().unwrap();
res.get(&req_id)
// there `ok()` should probably be handled in some better way
.and_then(|val| val.as_ref().ok())
.map(|val| val.as_ptr())
.unwrap_or(std::ptr::null()) as *const _
}
Playground link.
Keep in mind, the solution above have its advantages:
and significant disadvantages as well:
RESULTS
grows indefinetely and never cleared;thread_local!
can be used for globals instead of locks;STATUSES
at is_finished
acquire write access, though might be better have a read access first;