Search code examples
windowsrustrust-tokiorust-rocketsea-orm

Rust async loop function blocks another futures execution


I have my app consisting of service and http server.

  • Service has some things going on with OS api, waiting for events, etc. and it has loop for this purposes as well as async db writes. I start this service with async function.
  • Server(written with rocket) uses async request hadnles as well because I'm currently using SeaORM which uses async.

Problem: When I hit my server with request, it never starts async tasks within handler, unless event in my service loop is fired. When event is fired in my service, handler finishes well but on the next request its the same.

I tried to:

  • Use tokio::spawn, tokio::task::spawn, both of them were working exactly the same way (blocked execution)
  • As far as I know I can't spawn regular thread, because I wouldn't be able to .await anyways.
  • Also I tried to mark main with #[rocket::main(worker_threads = 4)] which should make more async threads? But It's still the same.

How can I overcome this? I can think of is to just use another ORM like diesel which is not async, and as I don't currently use async anywhere else beside ORM it will work, but I don't think this is a good solution. And another thought is to add ticks to my loop, so it won't be stuck until service event is fired, but this also looks weird and handles latency will still depend on this.

Minimal reproducible example:

#[macro_use]
extern crate rocket;

use std::sync::mpsc::{channel, Sender};

use once_cell::sync::OnceCell;
use rocket::serde::{json::Json, Deserialize, Serialize};
use rocket::State;
use sea_orm::{entity::prelude::*, Database, Set};
use sea_orm::{DbBackend, Schema};
use tokio::join;
use windows::{
    w,
    Win32::Foundation::HWND,
    Win32::UI::{
        Accessibility::{SetWinEventHook, HWINEVENTHOOK},
        WindowsAndMessaging::{MessageBoxW, EVENT_SYSTEM_FOREGROUND, MB_OK},
    },
};

thread_local! {
    static TX: OnceCell<Sender<RawWindowEvent>>= OnceCell::new()
}

#[rocket::main]
async fn main() {
    let db = Database::connect("sqlite://data.db?mode=rwc")
        .await
        .unwrap();

    let builder = db.get_database_backend();

    let stmt = builder.build(
        Schema::new(DbBackend::Sqlite)
            .create_table_from_entity(Entity)
            .if_not_exists(),
    );

    db.execute(stmt).await.unwrap();

    let server = rocket::build()
        .manage(db.clone())
        .mount("/", routes![get_events])
        .launch();

    let service = tokio::spawn(service(db.clone()));

    join!(server, service);
}

#[get("/event")]
async fn get_events(db: &State<DatabaseConnection>) -> Json<Vec<Model>> {
    let db = db as &DatabaseConnection;

    let events = Entity::find().all(db).await.unwrap();

    Json(events)
}

extern "system" fn win_event_hook_callback(
    child_id: HWINEVENTHOOK,
    hook_handle: u32,
    event_id: HWND,
    window_handle: i32,
    object_id: i32,
    thread_id: u32,
    timestamp: u32,
) -> () {
    let event = RawWindowEvent {
        child_id,
        hook_handle,
        event_id,
        window_handle,
        object_id,
        thread_id,
        timestamp,
    };

    TX.with(|f| {
        let tx: &Sender<RawWindowEvent> = f.get().unwrap();

        tx.send(event).unwrap();
    });
}

async fn service(db: DatabaseConnection) {
    let (tx, cx) = channel::<RawWindowEvent>();

    std::thread::spawn(move || {
        TX.with(|f| f.set(tx)).unwrap();

        let hook = unsafe {
            SetWinEventHook(
                EVENT_SYSTEM_FOREGROUND,
                EVENT_SYSTEM_FOREGROUND,
                None,
                Some(win_event_hook_callback),
                0,
                0,
                0,
            )
        };

        let _ = unsafe { MessageBoxW(None, w!("Text"), w!("Text"), MB_OK) };
    });

    loop {
        let event = cx.recv();

        if (event.is_err()) {
            break;
        }

        let event = event.unwrap();

        // There goes some event processing with another windows api calls or simple calculations...

        let record = ActiveModel {
            timestamp: Set(event.timestamp),
            ..Default::default()
        };

        Entity::insert(record).exec(&db).await.unwrap();
    }
}

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
#[sea_orm(table_name = "event")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: i32,
    pub timestamp: u32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Debug, Clone, Copy)]
pub struct RawWindowEvent {
    pub child_id: HWINEVENTHOOK,
    pub hook_handle: u32,
    pub event_id: HWND,
    pub window_handle: i32,
    pub object_id: i32,
    pub thread_id: u32,
    pub timestamp: u32,
}

Dependencies in Cargo.toml:

[dependencies]
dotenv = "0.15.0"
tokio = { version = "1.28.2", features = ["full"] }
sea-orm = { version = "^0.11", features = [ "sqlx-sqlite", "runtime-tokio-native-tls", "macros" ] }
rocket = {version = "0.5.0-rc.3", features = ["json"]}
once_cell = "1.17.1"

[dependencies.windows]
version = "0.48.0"
features = [
    "Win32_Foundation",
    "Win32_UI_Accessibility",
    "Win32_UI_WindowsAndMessaging",
    "Win32_System_Threading",
    "Win32_System_ProcessStatus"
]

Solution

  • You're using a synchronous channel, and by that you're blocking the runtime. Use the channel defined in tokio: tokio::sync::mpsc.