i'm using UniRx to use streams in unity. im using the CombineLatest() operator to combine two subjects and publish this value in one of the streams. when i'm using the generated value outside the .Subscribe() the OnNext() is called and works perfectly:
Observable.CombineLatest(positionStream, speed, (position, speed) =>
{
return position + speed;
}).TakeLast(1).Subscribe(a =>
{
last_value = a;
});
positionStream.OnNext(last_value);
but when i'm calling the OnNext() inside the .Subscribe() the .OnNext() is never called:
Observable.CombineLatest(positionStream, speed, (position, speed) =>
{
return position + speed;
}).TakeLast(1).Subscribe(a =>
{
positionStream.OnNext(a);
});
can someone please tell me what's wrong with the second approach? i'm suspecting that CombineLatest() returns cold observable and that's why second approach doesn't work.
You are calling .TakeLast(1)
which requires that the source observable does two things. (1) produces a value and (2) completes. Your code doesn't show how your CombineLatest
completes so it will appear as if no value is produced.
Here's a version of your code that does call the subscription:
var positionStream = new Subject<int>();
var speed = new Subject<int>();
Observable
.CombineLatest(positionStream, speed, (position, speed) => position + speed)
.TakeLast(1)
.Subscribe(a =>
{
positionStream.OnNext(a);
Console.WriteLine("!");
});
speed.OnNext(42);
positionStream.OnNext(42);
speed.OnNext(42);
positionStream.OnNext(42);
speed.OnCompleted();
positionStream.OnCompleted();
Note that it only calls the subscription once.
Now, if I understand what you're trying to do, it seems you want to be able to manually reset a position, but also send thru speed values which also update the current position.
If that's right, try this:
var positionStream = new Subject<int>();
var speed = new Subject<int>();
var final_position =
positionStream
.Select(p => speed.Scan(p, (a, x) => a + x).StartWith(p))
.Switch();
final_position
.Subscribe(a => Console.WriteLine(a));
positionStream.OnNext(42);
speed.OnNext(2);
speed.OnNext(5);
positionStream.OnNext(16);
speed.OnNext(4);
That produces the following values:
42
44
49
16
20
Here's how to push all the values into positionStream
:
var final_position =
positionStream
.Select(p =>
speed
.Scan(p, (a, x) => a + x)
.Do(y => positionStream.OnNext(y))
.StartWith(p))
.Switch();
I don't like the use of Do
to force side-effects, but it works here.