Search code examples
c#multithreadingvisual-studio-2013blockingcollectionconcurrent-queue

misunderstanding of concurrentQueue, a single consumer working from the queue on it's own thread


I'm having trouble creating a functioning SystemFileWatcher that takes the created event and stores it in a queue for a separate thread to work from. I've read countless threads here regarding this issue but I can't get my head around this particular problem.

using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;

namespace FileSystemWatcherTest
{
    class Program
    {
        public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());

    static void Main(string[] args)
    {
        string path = @"C:\test\";
        FileSystemWatcher watcher = new FileSystemWatcher();

        watcher.Path = path;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";

        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ThreadStart(mover));
        Consumer.Start();


        while (true) ;//run infinite loop so program doesn't terminate untill we force it.
    }
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        processCollection.Add(e.FullPath);     
    }

    static void mover()
    {
        string current;
        string processed = @"C:\test\processed\";
        while (true)
        {
            while (processCollection.IsCompleted)
            {
                Thread.Sleep(1000);
            }
            while (processCollection.TryTake(out current))
            {
                System.IO.File.Move(current, processed);
            }
        }
    }
}

}

This is what I'd like to test. I'm aware this does not work. I've verified the FSW works when I'm simply writing to console when a file is placed inside the queue. My problem begins around when I try to start the mover function in it's own thread. The mover function and onCreated do not appear to communicate once I start working off the queue.

My expectation of this code is to start the mover function in it's own thread and run it alongside the SFW. My expectation is that the concurrentqueue attached to the blockingcollection auto updates (I enqueue an item through onCreated, the mover sees that it now has +1 to that queue. The mover takes one from the queue, the onCreated sees this.) I'm probably using the Thread.Sleep incorrectly. I no longer have a supporting reason for using blockingcollection (which I chose at first to handle waiting for the queue to fill up and, basically, constantly check the queue for an item to process) and am open to changing this to whatever might work. I've seen use of locks, but from what I understand this is not really necessary due to how concurrentQueue synchronizes.

The ultimate goal is to process large quantities of small files that come in at random times and can range from 1 to several hundred at any given time. These files are .EMLs.

If at all possible, I would greatly appreciate an explanation of what is happening and what a suggestion would be to get around this problem. I come humbly and expect to be told everything I understand is incorrect!

edit: I'm testing this as a console application but it will be used as a service afterwards. I added the while (true) ; before onCreated() to keep FSW running.


Solution

  • You have several different problems in your code example:

    1. You are misusing the File.Move() method. It requires both parameters to be the full file name. You are passing the directory name as the second parameter, which is incorrect.
    2. You are inspecting the IsCompleted property of the collection, as if that would be useful. It will always be false, and so that block of code does nothing. This leads to the next problem…
    3. Your thread is running in a tight loop, consuming massive amounts of CPU time. This may or may not cause errors, but it could…FileSystemWatcher is not actually guaranteed to always report changes, and one of the reasons it might not is if it can't get enough CPU time to monitor the file system. If you starve it by using up all the CPU time, you might find it simply doesn't report a change. Note that this problem exists in your primary thread too; it also is running in a tight loop, consuming massive amounts of CPU time doing nothing. So you are completely occupying two cores of your system.
    4. You are failing to take advantage of the producer/consumer model of execution that BlockingCollection is designed for. You should be having your worker thread enumerate the enumeration returned by GetConsumingEnumerable(), using the CompleteAdding() method to signal to that thread that there is no more work.

    Here is a version of your code example that corrects the above mistakes, as well as cleans up the example a bit so that it's more self-contained:

    // The default backing collection for BlockingCollection<T>
    // is ConcurrentQueue<T>. There's no need to specify that
    // explicitly.
    public static BlockingCollection<string> processCollection = new BlockingCollection<string>();
    
    static void Main(string[] args)
    {
        string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");
    
        Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
        Directory.CreateDirectory(testDirectory);
    
        FileSystemWatcher watcher = new FileSystemWatcher();
    
        watcher.Path = testDirectory;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";
    
        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
        Consumer.Start(testDirectory);
    
        string text;
    
        while ((text = Console.ReadLine()) != "")
        {
            string newFile = Path.Combine(testDirectory, text + ".txt");
    
            File.WriteAllText(newFile, "Test file");
        }
    
        processCollection.CompleteAdding();
    }
    
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        if (e.ChangeType == WatcherChangeTypes.Created)
        {
            processCollection.Add(e.FullPath);
        }
    }
    
    static void mover(object testDirectory)
    {
        string processed = Path.Combine((string)testDirectory, "processed");
    
        Console.WriteLine("Creating directory: \"{0}\"", processed);
    
        Directory.CreateDirectory(processed);
    
        foreach (string current in processCollection.GetConsumingEnumerable())
        {
            // Ensure that the file is in fact a file and not something else.
            if (File.Exists(current))
            {
                System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
            }
        }
    }