Let me start by saying that I'm really new to both Reactive Extensions and DynamicData so i'm probably missing something really obvious here.
Aim: I want to calculate the profit (or loss) for some trades (similar to: https://dynamic-data.org/2014/11/22/trading-example-part-2-manage-market-data/) based on the new market prices. For simplicity lets say that all prices received in the last second are considered new.
My problem: Multiple update events occur for the same currencies in 1 sec (eg EURUSD) even if batch editing is used. Ideally i would only like to raise 1 event based on the latest available value to avoid doing unnecessary calculations.
My code so far:
Main.cs
using System.Reactive.Linq;
using DynamicData;
TickService tickService = new();
tickService.NewTicks
.Connect()
.Watch("EURUSD")
.Subscribe(price => Console.WriteLine(price));
/*
* In the actual project the prices come from an external system,
* but that is irrelevant here
* so for the sake of simplicity I'm adding them manually (with a delay)
* to TickService
*/
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
var newTicks = new List<Tick>() { new Tick() { Name = "EURUSD", Price = i, LastUpdate = DateTime.UtcNow } };
tickService.AddTicks(newTicks);
Console.WriteLine($"Added: i -> {i}");
Thread.Sleep(250);
}
});
Console.ReadLine();
Tick.cs
public class Tick
{
public string Name { get; set; } = "";
public double Price { get; set; }
public DateTime LastUpdate { get; set; }
public override string ToString()
{
return $"Name: {Name}, Price: {Price}, LastUpdate: {LastUpdate}";
}
}
TickService.cs (Exposes an IObservableCache with the prices of the last second)
using DynamicData;
public class TickService
{
private readonly SourceCache<Tick, string> _ticksCache = new(x => x.Name);
public IObservableCache<Tick, string> NewTicks { get; }
public TickService()
{
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.AsObservableCache();
}
public void AddTicks(ICollection<Tick> newTicks)
{
_ticksCache.Edit(innerCache => innerCache.AddOrUpdate(newTicks));
}
}
The above code results in the below result:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Add, Key: EURUSD, Current: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm, Previous: <None>
Update, Key: EURUSD, Current: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm, Previous: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm
What i want is:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm
Since i'm using batch editing I would expect only one one update event to produce every second for EURUSD. Any ideas/advice as to what I'm missing here?
Thank you
This has been a long standing request by several people which has never materialized into an operator, so I've added a new operator to Dynamic Data to achieve uniqueness within a change set notification.
New operator is called EnsureUniqueKeys
which can be applied to a cache
var uniqueChanges = _myCache.Connect().EnsureUniqueKeys()
where
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.AddOrUpdate(new Person("Me", 22));
});
will produce a single Add notification for Person("Me", 22)
and
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.RemoveKey("Me");
});
Will produce an empty change set because the item is added, updated and removed in the same edit.
The operator can be applied in the TickService as such:
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.EnsureUniqueKeys()
.AsObservableCache();