Search code examples
mongodbrustchangestream

How to write pipeline for collection.watch() in rust?


I have been creating some websocket method on my backend that will use ChangeStream to track data changes in my MongoDb collection. I have this code with a simple pipeline that tries to find records with a certain value in the author.id field:

async fn websocket(socket: WebSocket, state: State<AppState>, user_id: ObjectId) {
let (mut sender, _receiver) = socket.split();

let pipeline = vec![doc! {
    "$match": {
        "author.id": user_id
    }
}];

let change_stream = state
    .db
    .posts_collection
    .watch(pipeline, None)
    .await
    .map_err(|err| {
        eprintln!("Error creating change stream: {:?}", err);
        "Failed to create change stream".to_string()
    });

It does not track changes but if i make my pipeline None it starts to track any changes on the collection, so how to create ChangeStream that gonna track changes only on my matching records?


Solution

  • To track operations on the collection records(insert, update, delete) for specific fields, u have to make adjustments on pipeline and add options:

    let pipeline = vec![doc! {
        "$match": {
            "$or": [
                // insert
                {"fullDocument.author.id": user_id},
                // delete
                {"fullDocumentBeforeChange.author.id": user_id},
                // update
                {"updateDescription.updatedFields.author.id": user_id},
            ]
        }
    }];
    
    let options = ChangeStreamOptions::builder()
        .full_document(Some(FullDocumentType::UpdateLookup))
        // full_document_before_change returns pre-image of record(record data before update, by update in my case i mean deletion of record)
        // to use full_document_before_change make sure to enable pre-image option when creating colleciton in your code
        .full_document_before_change(Some(
            mongodb::options::FullDocumentBeforeChangeType::WhenAvailable,
        ))
        .build();
    

    Example of enabling pre-image and post-image options on collection(works only MongoDb v6.0 or later):

            let enable = ChangeStreamPreAndPostImages::builder()
            .enabled(true)
            .build();
        let opts = CreateCollectionOptions::builder()
            .change_stream_pre_and_post_images(enable)
            .build();
        let create_posts_collection = database
            .create_collection(&posts_collection_name, opts)
            .await;
        create_posts_collection.map_err(|err| {
            eprintln!(
                "Error creating posts collection with change stream options: {}",
                err
            );
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                Json(OperationStatusResponse {
                    success: false,
                    error_message: Some(format!(
                        "Failed to create MongoDB collection: {}",
                        err.to_string()
                    )),
                }),
            )
        })?;
        let posts_collection = database.collection::<Post>(&posts_collection_name);