Search code examples
c#dynamic-data

How to produce single updates when batch editing in dynamic data


Let me start by saying that I'm really new to both Reactive Extensions and DynamicData so i'm probably missing something really obvious here.

Aim: I want to calculate the profit (or loss) for some trades (similar to: https://dynamic-data.org/2014/11/22/trading-example-part-2-manage-market-data/) based on the new market prices. For simplicity lets say that all prices received in the last second are considered new.

My problem: Multiple update events occur for the same currencies in 1 sec (eg EURUSD) even if batch editing is used. Ideally i would only like to raise 1 event based on the latest available value to avoid doing unnecessary calculations.

My code so far:

Main.cs

using System.Reactive.Linq;
using DynamicData;


TickService tickService = new();

tickService.NewTicks
    .Connect()
    .Watch("EURUSD")
    .Subscribe(price => Console.WriteLine(price));

/*
 * In the actual project the prices come from an external system,
 * but that is irrelevant here 
 * so for the sake of simplicity I'm adding them manually (with a delay)
 * to TickService
 */
Task.Run(() =>
{
    for (int i = 0; i < 10; i++)
    {
        var newTicks = new List<Tick>() { new Tick() { Name = "EURUSD", Price = i, LastUpdate = DateTime.UtcNow } };
        tickService.AddTicks(newTicks);
        Console.WriteLine($"Added: i -> {i}");
        Thread.Sleep(250);
    }
});

Console.ReadLine();

Tick.cs

public class Tick
{
    public string Name { get; set; } = "";
    public double Price { get; set; }
    public DateTime LastUpdate { get; set; }

    public override string ToString()
    {
        return $"Name: {Name}, Price: {Price}, LastUpdate: {LastUpdate}";
    }
}

TickService.cs (Exposes an IObservableCache with the prices of the last second)

using DynamicData;

public class TickService
{
    private readonly SourceCache<Tick, string> _ticksCache = new(x => x.Name);
    public IObservableCache<Tick, string> NewTicks { get; }

    public TickService()
    {
        NewTicks = _ticksCache
             .Connect()
             .Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
             .Batch(TimeSpan.FromSeconds(1))
             .AsObservableCache();
    }

    public void AddTicks(ICollection<Tick> newTicks)
    {
        _ticksCache.Edit(innerCache => innerCache.AddOrUpdate(newTicks));
    }
}

The above code results in the below result:

Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Add, Key: EURUSD, Current: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm, Previous: <None>
Update, Key: EURUSD, Current: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm, Previous: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm

What i want is:

Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm

Since i'm using batch editing I would expect only one one update event to produce every second for EURUSD. Any ideas/advice as to what I'm missing here?

Thank you


Solution

  • This has been a long standing request by several people which has never materialized into an operator, so I've added a new operator to Dynamic Data to achieve uniqueness within a change set notification.

    New operator is called EnsureUniqueKeys which can be applied to a cache

        var uniqueChanges = _myCache.Connect().EnsureUniqueKeys()
    

    where

        _myCache.Edit(innerCache =>
        {
            innerCache.AddOrUpdate(new Person("Me", 20));
            innerCache.AddOrUpdate(new Person("Me", 21));
            innerCache.AddOrUpdate(new Person("Me", 22));
        });
    

    will produce a single Add notification for Person("Me", 22)

    and

      _myCache.Edit(innerCache =>
        {
            innerCache.AddOrUpdate(new Person("Me", 20));
            innerCache.AddOrUpdate(new Person("Me", 21));
            innerCache.RemoveKey("Me");
        });
    

    Will produce an empty change set because the item is added, updated and removed in the same edit.

    The operator can be applied in the TickService as such:

     NewTicks = _ticksCache
            .Connect()
            .Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
            .Batch(TimeSpan.FromSeconds(1))
            .EnsureUniqueKeys()      
            .AsObservableCache();