Search code examples
c#unity-game-engineobservableunirx

How can I split the integer stream into buffers using conditional windows instead of time based windows?


I have an Observable that returns integers like so:

1, 1, 1, 1, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, 0, 0, 0

How can I convert this Observable to return arrays of those integers splitting stream not by the time based windows but by value based ones?

Those integers are fingerId of Touch in Unity Update event. It's not so important for this task but to explain why I need it I have to provide these details. -1 is for no touch. This is the gap. I need to remove those -1 parts and split stream to buffers of fingerId's between «no touch» moments. I can describe this like this too:

Touch0, Touch0, Touch0, no Touch, no Touch, Touch1

It doesnt' matter if there is integer or another type. Just need to split stream into buffers removing «window values».

This is my code if it will be helpful:

var leftSideTouchStream = Observable.EveryUpdate()
            .Scan(-1, (id, _) =>
            {
                if (id < 0)
                {
                    var leftSideTouches = Input.touches
                        .Where(t =>
                            t.phase == TouchPhase.Began
                            && t.position.x < Screen.width / 2
                        );

                    return leftSideTouches.Any() ? leftSideTouches.First().fingerId : -1;

                }
                else
                {
                    var touchEnded = Input.touches
                        .Any(t =>
                            t.fingerId == id &&
                            (t.phase == TouchPhase.Ended || t.phase == TouchPhase.Canceled)
                        );

                    return touchEnded ? -1 : id;
                }
            })
            .Select(id =>
            {
                return Input.touches
                    .Where(t => t.fingerId == id)
                    .Select(t => new Nullable<Touch>(t))
                    .FirstOrDefault();
            });

I need exactly the same behaviour as Buffer function gives but as I said according to values not the time. If I have this stream:

1, 1, 1, 1, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, 0, 0, 0

And the «window value» is -1, then the result is expected to be:

[1, 1, 1, 1], [0, 0, 0, 0], [0, 0, 0]

Solution

  • Unfortunately I didn't find any out-of-the-box solution so I came up with my own Observable implementation:

    using System;
    using System.Collections.Generic;
    using UniRx;
    
    public static class ObservableFunctions
    {
        public static IObservable<T[]> BufferWhen<T>(this IObservable<T> source, Predicate<T> predicate)
        {
            return Observable.Create<T[]>(observer =>
            {
                List<T> buffer = new List<T>();
    
                source.Subscribe(
                    t =>
                    {
                        if (predicate(t))
                        {
                            buffer.Add(t);
                        }
                        else
                        {
                            if (buffer.Count > 0)
                            {
                                observer.OnNext(buffer.ToArray());
                                buffer = new List<T>();
                            }
                        }
                    },
                    e =>
                    {
                        observer.OnError(e);
                    },
                    () =>
                    {
                        observer.OnCompleted();
                    }
                );
    
                return Disposable.Empty;
            });
        }
    }
    

    Luckily it was very simple. Simplier than searching for the appropriate function in Google...