I started my sample from the one provided here:http://www.jaylee.org/post/2012/08/26/An-update-to-matthieumezil-Rx-and-the-FileSystemWatcher.aspx but the issue is that if you are watching a folder that has many files being modified/created/deleted at all time, it will never return an event since the throttle will never stop.
What I needed is to create a new temporary stream for each files being changed and only send the last event (if it is a delete event, we don't notify).
I think my sample is working, but since it is the first time I use Rx, I'd like to have some feedback on how I could simplify it.
Here it is:
public class FileWatcher
{
public class FileChangedEvent
{
public string FullPath { get; private set; }
public bool IsFileDeleted { get; private set; }
public FileChangedEvent(string path, bool isFileDeleted = false)
{
FullPath = path;
IsFileDeleted = isFileDeleted;
}
}
public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
return Observable.Create<FileChangedEvent>(
observer =>
{
var fileSystemWatcher = new FileSystemWatcher(path, filter) { EnableRaisingEvents = true };
var sources = CreateSources(fileSystemWatcher);
var fileSources = new ConcurrentDictionary<string, Subject<FileChangedEvent>>();
return sources.Merge().Subscribe(fileChange =>
{
Subject<FileChangedEvent> fileSubject = fileSources.GetOrAdd(fileChange.FullPath, (key) =>
{
//Create a new stream for this file.
var addedFileSubject = new Subject<FileChangedEvent>();
addedFileSubject.Throttle(throttle).Subscribe(lastFileChange =>
{
if (lastFileChange != null)
{
Subject<FileChangedEvent> dummy;
fileSources.TryRemove(lastFileChange.FullPath, out dummy);
//Only send the event if the file was not deleted.
if (!lastFileChange.IsFileDeleted)
{
observer.OnNext(lastFileChange);
}
}
});
return addedFileSubject;
});
fileSubject.OnNext(fileChange);
});
}
);
}
private static IObservable<FileChangedEvent>[] CreateSources(FileSystemWatcher fileWatcher)
{
return new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Created += handler, handler => fileWatcher.Created -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Deleted += handler, handler => fileWatcher.Deleted -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath, true)),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs >(handler => fileWatcher.Changed += handler, handler => fileWatcher.Changed -= handler)
.Select(ev => new FileChangedEvent(ev.EventArgs.FullPath)),
//The rename source needs to send a delete event for the old file name.
Observable.Create<FileChangedEvent>(nameChangedObserver =>
{
return Observable.FromEventPattern<RenamedEventHandler, RenamedEventArgs>(handler => fileWatcher.Renamed += handler, handler => fileWatcher.Renamed -= handler)
.Subscribe(ev =>
{
nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.FullPath));
nameChangedObserver.OnNext(new FileChangedEvent(ev.EventArgs.OldFullPath, true));
});
}),
Observable.FromEventPattern<ErrorEventHandler, ErrorEventArgs >(handler => fileWatcher.Error += handler, handler => fileWatcher.Error -= handler)
.SelectMany(ev => Observable.Throw<FileChangedEvent>(ev.EventArgs.GetException()))
};
}
static void Main(string[] args)
{
var fileWatcher =
FileWatcher.ObserveFolderChanges("Test Path Here", "*.*", TimeSpan.FromSeconds(30))
.Subscribe(fce => { if(fce != null) Console.WriteLine("Changed :" + fce.FullPath); }, e => Debug.WriteLine(e));
Console.ReadLine();
}
}
The use case of this is to be notified when a file is done being uploaded on the file path.
So, my questions:
1) Is it possible to simplify this?
2) Am I leaking something?
3) Is there a big performance loss versus doing this without Rx?
Thanks!
Yes you are leaking the FileSystemWatcher
and yes it can also be simplified to eliminate all the futzing around with the dictionary and subjects.
Use Observable.Using
to manage the lifetime of the watcher. And use GroupBy
to get rid of the dictionary of subjects:
public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
{
return Observable.Using(
() => new FileSystemWatcher(path, filter) { EnableRaisingEvents = true },
fileSystemWatcher => CreateSources(fileSystemWatcher)
.Merge()
.GroupBy(c => c.FullPath)
.SelectMany(fileEvents => fileEvents
.Throttle(throttle)
.Where(e => !e.IsFileDeleted)));
}