I am currently writing an IRC bot. I'd like to avoid excess flood, so I decided to create a message queue that would send the next message every X milliseconds, but my attempt failed. Line 43:
unset.Add((string)de.Key);
throws an OutOfMemory exception. I have absolutely no idea what I'm doing wrong.
Perhaps I should also explain the general idea behind such (possibly complicated) way of queuing.
Firstly, the main Hashtable queueht
stores ConcurrentQueue<string>
types, where targets for the messages serve as keys. I would like the bot to iterate through the hashtable, sending one message from each queue (and removing the key if the queue is emptied). I couldn't think of a suitable method to work on the hashtable itself, so I decided to create another queue, ConcurrentQueue<string> queue
, which would store keys and their order of use when emptying the queue.
Assuming a hypothetical situation with several hundred items in a queue (which might be possible), any new request would be delayed by Lord knows how long (built-in delay between messages plus latency), so I have the method Add() rebuild queue
. I create a deep copy of queueht
(or so I hope) and generate a new queue
based on this disposable copy, getting rid of it in the process.
I assume my train of thought and/or code to be horribly wrong, since I have nearly no experience with threading, collections more complicated than simple arrays and OOP habits/conventions whatsoever. I would really appreciate the solution to my problem with an explanation. Thanks in advance!
EDIT: Posting the entire class.
class SendQueue
{
Hashtable queueht;
ConcurrentQueue<string> queue;
Timer tim;
IRCBot host;
public SendQueue(IRCBot host)
{
this.host = host;
this.tim = new Timer();
this.tim.Elapsed += new ElapsedEventHandler(this.SendNewMsg);
this.queueht = new Hashtable();
this.queue = new ConcurrentQueue<string>();
}
public void Add(string target, string msg)
{
try
{
this.queueht.Add(target, new ConcurrentQueue<string>());
}
finally
{
((ConcurrentQueue<string>)this.queueht[target]).Enqueue(msg);
}
Hashtable ht = new Hashtable(queueht);
List<string> unset = new List<string>();
while (ht.Count > 0)
{
foreach (DictionaryEntry de in ht)
{
ConcurrentQueue<string> cq = (ConcurrentQueue<string>)de.Value;
string res;
if (cq.TryDequeue(out res))
this.queue.Enqueue((string)de.Key);
else
unset.Add((string)de.Key);
}
}
if (unset.Count > 0)
foreach (string item in unset)
ht.Remove(item);
}
private void SendNewMsg(object sender, ElapsedEventArgs e)
{
string target;
if (queue.TryDequeue(out target))
{
string message;
if (((ConcurrentQueue<string>)queueht[target]).TryDequeue(out message))
this.host.Say(target, message);
}
}
}
EDIT2: I am aware that while (ht.Count > 0)
will be executed indefinitely. It's just a part leftover from previous version which looked like that:
while (ht.Count > 0)
{
foreach (DictionaryEntry de in ht)
{
ConcurrentQueue<string> cq = (ConcurrentQueue<string>)de.Value;
string res;
if (cq.TryDequeue(out res))
this.queue.Enqueue((string)de.Key);
else
ht.Remove((string)de.Key);
}
}
But the collection cannot be modified when it's evaluated (and I found that out the hard way), so it's no longer like that. I just forgot to change the condition for while
.
I took liberty of trying TheThing's solution. While it seems to fulfil its purpose, it doesn't send any messages... Here's its final form:
class User
{
public User(string username)
{
this.Username = username;
this.RequestQueue = new Queue<string>();
}
public User(string username, string message)
: this(username)
{
this.RequestQueue.Enqueue(message);
}
public string Username { get; set; }
public Queue<string> RequestQueue { get; private set; }
}
class SendQueue
{
Timer tim;
IRCBot host;
public bool shouldRun = false;
public Dictionary<string, User> Users; //Dictionary of users currently being processed
public ConcurrentQueue<User> UserQueue; //List of order for which users should be processed
public SendQueue(IRCBot launcher)
{
this.Users = new Dictionary<string, User>();
this.UserQueue = new ConcurrentQueue<User>();
this.tim = new Timer(WorkerTick, null, Timeout.Infinite, 450);
this.host = launcher;
}
public void Add(string username, string request)
{
lock (this.UserQueue) //For threadsafety
{
if (this.Users.ContainsKey(username))
{
//The user is in the user list. That means he has previously sent request that are awaiting to be processed.
//As such, we can safely add his new message at the end of HIS request list.
this.Users[username].RequestQueue.Enqueue(request); //Add users new message at the end of the list
return;
}
//User is not in the user list. Means it's his first request. Create him in the user list and add his message
var user = new User(username, request);
this.Users.Add(username, user); //Create the user and his message
this.UserQueue.Enqueue(user); //Add the user to the last of the precessing users.
}
}
public void WorkerTick(object sender)
{
if (shouldRun)
{
//This tick runs every 400ms and processes next message to be sent.
lock (this.UserQueue) //For threadsafety
{
User user;
if (this.UserQueue.TryDequeue(out user)) //Pop the next user to be processed.
{
string message = user.RequestQueue.Dequeue(); //Pop his request
this.host.Say(user.Username, message);
if (user.RequestQueue.Count > 0) //If user has more messages waiting to be processed
{
this.UserQueue.Enqueue(user); //Add him at the end of the userqueue
}
else
{
this.Users.Remove(user.Username); //User has no more messages, we can safely remove him from the user list
}
}
}
}
}
}
I tried switching to ConcurrentQueue
, which should work as well (though in a more thread-safe way, not that I know anything about thread safety). I also tried switching to System.Threading.Timer
, but that doesn't help either. I've run out of ideas long ago.
EDIT: Being a complete and utter idiot, I didn't set the time for Timer to start. Changing the bool part to a Start() method that changes the timer's dueTime and interval made it work. Problem solved.
From what I can best understand, you want to be able to queue users in order and each of their request.
Meaning, if one user request like 1000 request, others can still send theirs and the bot serves 1 request from each user in a FIFO manner.
If so, then what you need is a manner, similar to this functionality:
class User
{
public User(string username)
{
this.Username = username;
this.RequestQueue = new Queue<string>();
}
public User(string username, string message)
: this(username)
{
this.RequestQueue.Enqueue(message);
}
public string Username { get; set; }
public Queue<string> RequestQueue { get; private set; }
}
///......................
public class MyClass
{
public MyClass()
{
this.Users = new Dictionary<string, User>();
this.UserQueue = new Queue<User>();
}
public Dictionary<string, User> Users; //Dictionary of users currently being processed
public Queue<User> UserQueue; //List of order for which users should be processed
public void OnMessageRecievedFromIrcChannel(string username, string request)
{
lock (this.UserQueue) //For threadsafety
{
if (this.Users.ContainsKey(username))
{
//The user is in the user list. That means he has previously sent request that are awaiting to be processed.
//As such, we can safely add his new message at the end of HIS request list.
this.Users[username].RequestQueue.Enqueue(request); //Add users new message at the end of the list
return;
}
//User is not in the user list. Means it's his first request. Create him in the user list and add his message
var user = new User(username, request);
this.Users.Add(username, user); //Create the user and his message
this.UserQueue.Enqueue(user); //Add the user to the last of the precessing users.
}
}
//**********************************
public void WorkerTick()
{
//This tick runs every 400ms and processes next message to be sent.
lock (this.UserQueue) //For threadsafety
{
var user = this.UserQueue.Dequeue(); //Pop the next user to be processed.
var message = user.RequestQueue.Dequeue(); //Pop his request
/////PROCESSING MESSAGE GOES HERE
if (user.RequestQueue.Count > 0) //If user has more messages waiting to be processed
{
this.UserQueue.Enqueue(user); //Add him at the end of the userqueue
}
else
{
this.Users.Remove(user.Username); //User has no more messages, we can safely remove him from the user list
}
}
}
}
Basically, we have a queue of users. We pop the next user, process his first request and add him to the end of the user list if he has more request waiting to be processed.
Hope this clears some functionality. For the record, the code above is more of a pseudocode than a functional code xD