Search code examples
c#multithreadingdata-structuresconcurrencymax-heap

Issues in the ConcurrentHeap implementation


Here is my MaxHeap implementation. Most of the time it works but occasionally it just causes few values out of order. I want it to be threadsafe. Tried debugging many times but couldn't identify the issue. Can anyone please point out the issues in my implementation?

using System.Threading;
using Crossbones.Common;
using LanguageExt;
using static LanguageExt.Prelude;

namespace MySpace
{

public static class Operators
{
    public static bool XOR(bool x, bool y) => (x && !y) || (!x && y);
    public static bool XAND(bool x, bool y) => (x || !y) && (!x || y);
}
    public enum CompareResult
    {
        Left = 1,
        Equal = 0,
        Right = -1
    };

    public delegate CompareResult HeapComparer<T>(in T left, in T right);

    public class Heap<T> where T : struct
    {
        private readonly int _capacity;
        private readonly IComparer<T> _comparer;
        private readonly List<T> _list;
        private long _itemCount;

        public Heap(int capacity, IComparer<T> comparer)
        {
            _capacity = capacity;
            _comparer = comparer;
            _list = new List<T>(capacity);
        }

        public Option<T> Pop()
        {
            Option<T> head = None;


            lock (_list)
            {
                if (_list.Any())
                {
                    head = Some(_list[0]);
                    _list[0] = _list[^1];
                    _list.RemoveAt(_list.Count - 1);
                    Count = _list.Count;
                    ShiftDown(0);
                }
            }


            return head;
        }
        public void Push(in T item)
        {
            lock (_list)
            {
                _list.Add(item);
                var itemIdx = _list.Count - 1;
                while (itemIdx > 0)
                {
                    var parentIdx = (int)(itemIdx - 1) / 2;
                    if (Compare(parentIdx, itemIdx) == CompareResult.Right)
                    {
                        Swap(parentIdx, itemIdx);
                        itemIdx = parentIdx;
                    }
                    else break;
                }

                Count = _list.Count;
            }
        }

        public bool IsEmpty => Count ==0;
        public bool IsFull => Count == _capacity;
        public long Count
        {
            get => Interlocked.Read(ref _itemCount);
            private set => Interlocked.Exchange(ref _itemCount, value);
        }

        public Option<T> Peek(int position)
        {
            Option<T> ret = None;

            if (position < Count - 1)
            {
                lock (_list)
                {
                    ret = Some(_list[position]);
                }
            }

            return ret;

        }

        public IEnumerable<T> ReadAll()
        {
            var ret = new List<T>((int)Count);

            lock (_list)
            {
                ret = _list;
                _list.Clear();
                Count = 0L;
            }

            return ret;
        }

        void Swap(int leftIdx, int rightIdx)
        {
            var tmp = _list[leftIdx];
            _list[leftIdx] = _list[rightIdx];
            _list[rightIdx] = tmp;
        }
        CompareResult Compare(int idxLeft, int idxRight) => _comparer.Compare(_list[idxLeft], _list[idxRight]) switch
        {
            -1 => CompareResult.Right,
            0 => CompareResult.Equal,
            1 => CompareResult.Left
        };
        int ShiftDown(int itemIdx)
        {
            var ret = itemIdx;
            var maxIdx = _list.Count - 1;
            while (itemIdx < maxIdx)
            {
                var left = (index: 2 * itemIdx + 1, present: (2 * itemIdx + 1) <= maxIdx);
                var right = (index: 2 * itemIdx + 2, present: (2 * itemIdx + 2) <= maxIdx);

                if (left.present && right.present)
                {
                    var target = Compare(left.index, right.index) switch
                    {
                        CompareResult.Left => left,
                        CompareResult.Right => right,
                        CompareResult.Equal => left
                    };
                    if (target.present)
                    {
                        Swap(target.index, itemIdx);
                        itemIdx = target.index;
                    }
                    else break;
                }
                else if (Operators.XOR(left.present, right.present))
                {
                    var target = left.present ? left : right.present ? right : (index: itemIdx, present: false);
                    if (target.present && Compare(target.index, itemIdx) == CompareResult.Left)
                    {
                        Swap(target.index, itemIdx);
                        itemIdx = target.index;
                    }
                    else itemIdx = target.index;
                }

                else break;
            }

            return ret;
        }
    }

}

I use LanguageExt.Core for some functional stuff. mainly Option, Some, None.

I'm keeping the count separately so that reading the Count and IsEmpty, IsFull are atomic operations.


Solution

  • The logic for ShiftDown is overly complex. When left and right are both present, there's no need to test target.present. You already know that target is present, since it can only be left or right.

    And in the case when they're not both present, then you know that left is the one that's present. In a proper heap, because it's left-filled, a node can't have a right child if it doesn't have a left child.

    A much simpler implementation is below. I maintained the return value, although I don't see why you would need this method to return a value. You certainly don't use it in your code.

    int ShiftDown(int itemIdx)
    {
        var ret = itemIdx;
        var  maxIdx = _list.Count - 1;
        while (itemIdx < maxIdx)
        {
            var largestChild = itemIdx;
            var leftIdx = 2*itemIdx + 1;
    
            if (leftIdx > maxIdx)
            {
                // node has no children
                break;
            }
    
            // left child exists. See if it's larger than its parent.
            if (_list[itemIdx > _list[itemIdx]) 
            {
                largestChild = leftIdx;
            }
            var rightIdx = leftIdx + 1;
            if (rightIdx <= maxIdx && _list[rightIdx] > _list[largestChild])
            {
                // right child exists and is larger than current largest child.
                largestChild = rightIdx;
            }
            if (largestChild != itemIdx)
            {
                Swap(itemIdx, largestChild);
                itemIdx = largestChild;
            }
        }
        return ret;
    }