I know that this question is asked a lot, but even with reading the other replies, I still don't know what to do to solve my problem.
I am using the mastodon-async crate to process events from a mastodon user timeline. For specific events, I want to send a part to a mqtt topic.
I am following this example
This is how my code is right now :
use std::thread;
use std::time::Duration;
use anyhow::{anyhow, Result};
use futures_util::TryStreamExt;
use log::{info, warn};
use mastodon_async::helpers::toml;
use mastodon_async::Mastodon;
use mastodon_async::prelude::{Event, Status};
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
async fn get_mastodon_data() -> Result<Mastodon> {
if let Ok(data) = toml::from_file("weaselbot-mastodon.toml") {
Ok(Mastodon::from(data))
} else {
Err(anyhow!("Mastodon doesn't seems to have been registered, please execute the register command !"))
}
}
fn process_weasel_toot(status: Status) -> Result<String> {
if !status.media_attachments.is_empty() {
for attachement in &status.media_attachments {
match &attachement.remote_url {
Some(url) => {
let url = url.clone();
return Ok(url);
}
None => warn!("can't find remote url !"),
}
}
} else {
warn!("status without any attachments");
}
Err(anyhow!("Status can't be processed for media"))
}
async fn evt(mut eventloop: EventLoop) {
while let notification = eventloop.poll().await {
match notification {
_ => ()
}
}
}
pub async fn mastodon_run() -> Result<()> {
let mastodon = get_mastodon_data().await?;
let stream = mastodon.stream_user().await?;
let mut mqttoptions = MqttOptions::new("mastodon", "127.0.0.1", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
// process eventloop
thread::spawn(move || evt(eventloop));
info!("Waiting for Mastodon events...");
stream.try_for_each(|event| async move {
match event {
Event::Update(status) => {
info!("found update event !");
match process_weasel_toot(status) {
Ok(url) => {
info!("Found url {}", url);
client.publish("mastodon", QoS::AtLeastOnce, false, url);
}
_ => (),
};
}
_ => (),
}
Ok(())
}).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<()>
{
mastodon_run().await
}
My dependencies :
[dependencies]
log = "0.4.17"
anyhow = "1.0.69"
toml = "0.5.11"
rumqttc = "0.20.0"
tokio = { version = "1.26.0", features = ["macros", "rt-multi-thread"] }
futures-util = "0.3.25"
mastodon-async = { version = "1.1.0", features = ["toml", "mt"] }
The compilation error :
error[E0507]: cannot move out of `client`, a captured variable in an `FnMut` closure
--> src/main.rs:59:41
|
54 | let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
| ------ captured outer variable
...
59 | stream.try_for_each(|event| async move {
| _________________________-------____________^
| | |
| | captured by this `FnMut` closure
60 | | match event {
61 | | Event::Update(status) => {
62 | | info!("found update event !");
... |
66 | | client.publish("mastodon", QoS::AtLeastOnce, false, url);
| | ------
| | |
| | variable moved due to use in generator
| | move occurs because `client` has type `rumqttc::AsyncClient`, which does not implement the `Copy` trait
... |
73 | | Ok(())
74 | | }).await?;
| |_____^ move out of `client` occurs here
For more information about this error, try `rustc --explain E0507`.
The mastodon-async crate use the try_stream from future-utils, and I tried to use something else than try_for_each, like a fold method that I saw could solve my problem, but I didn't find how to make it works.
Ok, so finally I found what I needed to do : just remove the move
keyword from the try_for_each
method invocation.