Search code examples
javaazure-cosmosdbazure-cosmosdb-sqlapiazure-java-sdkazure-cosmosdb-changefeed

Sample for publish/subscribe with Azure Cosmos DB in Java


I need a pub/sub event message system with Azure Cosmos DB. I use Azure Cosmos DB Java SDK v4.

I try with a ChangeFeedProcessor based on this sample https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java but it does not work like expected.

My problems:

  • The feed collection/container grow continue. How can I delete an event after all active nodes have receive the event?
  • The delay for the events seems relative large. Around a minute.
  • Only one node receive the events. This seems interesting for load balancing but this is not my use case.

Solution

  • With version 4.12.0 of the Java SDK the follow code snipped works for me. But it use beta code from the driver. It can change in the future.

    private static final String                CHANNEL = "events";
    
    private CosmosContainer                    collection;
    
    private boolean                            stopped;
    
    void start( String clientID ) {
        CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
        // delete all events after 60 seconds. All nodes should receive it in the meantime.
        props.setDefaultTimeToLiveInSeconds( 60 );
        collection = getOrCreateContainer( props );
        Thread thread = new Thread( () -> {
            String[] continuation = new String[1];
            try {
                while( !stopped ) {
                    // sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
                    CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
                    CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
                    CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
                    Iterator<EventPOJO> it = collection //
                                    .queryChangeFeed( options, EventPOJO.class ) //
                                    .handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
                                    .iterator();
                    while( it.hasNext() ) {
                        EventPOJO event = it.next();
                        if( event.client != clientID ) {
                            // filter the own events
                            onMessage( event );
                        }
                    }
                    // poll interval
                    Thread.sleep( 1000 );
                }
            } catch( Throwable th ) {
                if( !stopped ) {
                    PersistenceLogger.LOGGER.error( th );
                }
            }
        }, CHANNEL );
        thread.setDaemon( true );
        thread.start();
    }
    
    <T> void send( T event, String clientID ) {
        EventPOJO evt = new EventPOJO();
        evt.id = ...
        evt.client = clientID;
        evt.type = event.getClass().getName();
        evt.message = ...
    
        collection.createItem( evt );
    }