I've tried debugging this, but I've come to a point where I have no idea why this is happening (I'm also a threading newbie). About 2/3 of dequeued data comes up as null, while the rest gets through properly. Any insight will be greatly appreciated.
using UnityEngine;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace UDPNetwork
{
public class NetworkManager : MonoBehaviour
{
struct DataPacket
{
public IPEndPoint destination;
public byte[] data;
public int size;
public DataPacket(IPEndPoint destination, byte[] data)
{
this.destination = destination;
this.data = data;
size = 0;
}
}
[SerializeField]
string SERVER_IP = "127.0.0.1";
[SerializeField]
ushort SERVER_PORT = 55566;
AsyncPriorityQueue<DataPacket> queuedReceiveData = new AsyncPriorityQueue<DataPacket>(2000, false);
Socket sck;
IPEndPoint ipEndPoint;
bool listening = true;
bool processing = true;
void Start()
{
sck = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
#if SERVER
ipEndPoint = new IPEndPoint(IPAddress.Any, SERVER_PORT);
sck.Bind(ipEndPoint);
#endif
new Thread(() => ListenForData()).Start();
new Thread(() => ProcessData()).Start();
}
void OnDestroy()
{
listening = false;
processing = false;
sck.Close();
}
void ListenForData()
{
EndPoint endPoint = ipEndPoint;
while (listening)
{
byte[] buffer = new byte[512];
try
{
int rec = sck.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endPoint);
Array.Resize(ref buffer, rec);
queuedReceiveData.Enqueue(new DataPacket((IPEndPoint)endPoint, buffer) { size = rec }, 0);
}
catch (Exception e)
{
Debug.LogError(e.Message);
}
}
}
void ProcessData()
{
DataPacket rcv;
byte[] data;
IPEndPoint ep;
while (processing)
{
rcv = queuedReceiveData.Dequeue(); // blocks until queue has >1 item
data = rcv.data;
ep = rcv.destination;
if (data == null)
{
Debug.LogError(data); // null
Debug.LogError(rcv.size); // 0
Debug.LogError(ep); // null
Debug.LogError(rcv);
continue;
}
//process...
}
}
}
}
Queue:
using System;
/// <summary>
/// Priority queue removes added items highest priority items first, ties broken by First-In-First-Out.
/// </summary>
/// <typeparam name="T"></typeparam>
public class PriorityQueue<T>
{
struct Node
{
public T item;
public int priority;
public CircularInt32 insertionIndex;
}
Node[] items;
bool _resizeable;
CircularInt32 _numItemsEverEnqueued = 0;
/// <summary>
/// How many items are currently in the queue
/// </summary>
public int Count
{
get;
private set;
}
/// <summary>
/// How many items the queue can hold. 0 == infinite.
/// </summary>
public int Capacity
{
get
{
return _resizeable ? 0 : items.Length;
}
}
/// <summary>
/// Create a new resizeable priority queue with default capacity (8)
/// </summary>
public PriorityQueue() : this(8) { }
/// <summary>
/// Create a new priority queue
/// </summary>
/// <param name="capacity"></param>
/// <param name="resizeable"></param>
public PriorityQueue(int capacity, bool resizeable = true)
{
if (capacity < 2)
{
throw new ArgumentException("New queue size cannot be smaller than 2", "capacity");
}
items = new Node[capacity];
Count = 0;
_resizeable = resizeable;
}
/// <summary>
/// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
/// </summary>
/// <param name="item">object to add to queue</param>
/// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
/// <returns>true if added successfully, false otherwise (queue is full)</returns>
public bool Enqueue(T item, int priority)
{
if (Count == items.Length)
{
if (_resizeable)
{
Array.Resize(ref items, Capacity * 3 / 2 + 1);
}
else
{
return false;
}
}
items[Count] = new Node() { item = item, priority = priority, insertionIndex = _numItemsEverEnqueued++ };
percolateUp(Count);
Count++;
return true;
}
void percolateUp(int index)
{
while (true)
{
if (index == 0)
{
break;
}
int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;
if (HasHigherPriority(items[parent], items[index]))
{
var temp = items[index];
items[index] = items[parent];
items[parent] = temp;
index = parent;
}
else
{
break;
}
}
}
/// <summary>
/// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
/// Returns an object's default value if the queue is empty.
/// </summary>
/// <returns></returns>
public T Dequeue()
{
if (Count == 0)
{
return default(T);
}
var item = items[0].item;
items[0] = new Node();
percolateDown(0);
Count--;
return item;
}
void percolateDown(int index)
{
while (true)
{
int left = index * 2 + 1;
if (left + 1 < Count && HasHigherPriority(items[left + 1], items[left]))
{
var temp = items[index];
items[index] = items[left + 1];
items[left + 1] = temp;
index = left + 1;
}
else if (left < Count)
{
var temp = items[index];
items[index] = items[left];
items[left] = temp;
index = left;
}
else
{
break;
}
}
}
bool HasHigherPriority(Node higher, Node lower)
{
return (higher.priority < lower.priority || (higher.priority == lower.priority && higher.insertionIndex < lower.insertionIndex));
}
}
Async:
using System.Threading;
/// <summary>
/// A thread-safe priority queue.
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncPriorityQueue<T>
{
PriorityQueue<T> pq;
/// <summary>
/// How many items are currently in the queue
/// </summary>
public int Count
{
get { return pq.Count; }
}
/// <summary>
/// How many items the queue can hold. 0 == infinite.
/// </summary>
public int Capacity
{
get { return pq.Capacity; }
}
/// <summary>
/// Create a new resizeable async priority queue with default capacity (8)
/// </summary>
public AsyncPriorityQueue()
{
pq = new PriorityQueue<T>();
}
/// <summary>
/// Create a new priority queue
/// </summary>
/// <param name="capacity"></param>
/// <param name="resizeable"></param>
public AsyncPriorityQueue(int capacity, bool resizeable = true)
{
pq = new PriorityQueue<T>(capacity, resizeable);
}
/// <summary>
/// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
/// </summary>
/// <param name="item">object to add to queue</param>
/// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
/// <returns>true if added successfully, false otherwise (queue is full)</returns>
public bool Enqueue(T item, int priority)
{
lock (pq)
{
bool added = pq.Enqueue(item, priority);
if (pq.Count == 1)
{
Monitor.Pulse(pq);
}
return added;
}
}
/// <summary>
/// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
/// WARNING: if the queue is empty when this is called, the thread WILL BLOCK until a new item is added to the queue in another thread. If this behaviour is not wanted, be sure to check Count > 0.
/// </summary>
/// <returns></returns>
public T Dequeue()
{
lock (pq)
{
while (pq.Count == 0)
{
Monitor.Wait(pq);
}
return pq.Dequeue();
}
}
}
First, it's unclear why you're using a priority queue when all of your messages are being queued with priority 0. But I'll assume that your goal is to eventually change the priorities on some messages. In any case, because you enqueue everything at priority 0, you've unmasked a critical bug in your priority queue implementation.
I suspect that if you enqueued everything with priority 1, you'd never see this error. But you shouldn't do that.
The problem is that when you dequeue an item, you're re-adjusting the heap by percolating down an empty node that has a priority of 0. More importantly, its insertionIndex
is never set, so it's 0. That ends up putting the new empty node before the good nodes that are already in the queue, and new nodes that you add to the queue later will be added after that empty node. And because everything in your queue is at priority 0, the new empty node is left right there at the root.
You need to change the way that you re-adjust the heap when you dequeue an item. Rather than entering an empty node at the top and percolating it down, you should take the last node in the heap, insert it at the root, and percolate it down. But you'll have to change your percolateDown
method.
Here's what I would suggest:
public T Dequeue()
{
if (Count == 0)
{
return default(T);
}
var item = items[0].item;
items[0] = items[Count-1];
items[Count-1] = null;
Count--;
percolateDown(0);
return item;
}
void percolateDown(int index)
{
while (true)
{
// The rules for adjusting on percolate down are to swap the
// node with the highest-priority child. So first we have to
// find the highest-priority child.
int hpChild = index*2+1;
if (hpChild >= Count)
{
break;
}
if (hpChild+1 < Count && HasHigherPriority(items[hpChild+1], items[hpChild]))
{
++hpChild;
}
if (HasHigherPriority(items[hpChild, items[index]))
{
var temp = items[index];
items[index] = items[hpChild];
items[hpChild] = temp;
}
else
{
break;
}
index = hpChild;
}
}
For more detail about properly implementing a binary heap, see http://blog.mischel.com/2013/09/29/a-better-way-to-do-it-the-heap/, and the entries that follow.
A couple of other notes:
Rather than resizing the array yourself, you should just turn your items
array into a List<Node>
. It'll handle all the resizing and such for you.
In your percolateUp
, you have:
int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;
You can simplify that to:
int parent = (index + 1)/2;