Search code examples
c#mefsystem.reactive

Converting a MEF application to utilize Rx (System.Reactive)


So the current situation is I have a program that is completely utilizing MEF. Now I want to make it utilize Rx as to allow it to scale to larger queries and allow the user to look over results as the various plugins return results. It is currently setup as such:

Workflow: Query => DetermineTypes => QueryPlugins => Results

Currently the code is all stored on GitHub if anyone needs to reference more than what I post below. ALeRT on GitHub

The VS solution has a UI project (default StartUp Project), a PluginFramework Project, various TypePlugin Projects (think determining what the type is such as a URL, Email, File, Phone Number, etc) and also QueryPlugin Projects (perform xyz if the queryplugin supports the type that has been determined). All the results are displayed back into the UI by ways of a DataGrid that is being mapped to by the DefaultView of a DataTable.

I want to try and make the Rx portion as invisible to the plugins as possible. This is due to the fact that I do not want to make writing plugins complex for the few people that will. So I was thinking about taking the current Framework below:

public interface IQueryPlugin
{
    string PluginCategory { get; }
    string Name { get; }
    string Version { get; }
    string Author { get; }
    System.Collections.Generic.List<string> TypesAccepted { get; }
    string Result(string input, string type, bool sensitive);
}

and making the Result method into the following:

System.IObservable<string> Result(string input, string type, bool sensitive);

This would naturally require modifying the method that is calling the plugin which as it stands is:

                using (GenericParserAdapter parser = new GenericParserAdapter())
                {
                    using (TextReader sr = new StringReader(qPlugins.Result(query, qType, sensitive)))
                    {
                        Random rNum = new Random();

                        parser.SetDataSource(sr);
                        parser.ColumnDelimiter = Convert.ToChar(",");
                        parser.FirstRowHasHeader = true;
                        parser.MaxBufferSize = 4096;
                        parser.MaxRows = 500;
                        parser.TextQualifier = '\"';

                        DataTable tempTable = parser.GetDataTable();
                        tempTable.TableName = qPlugins.Name.ToString();
                        if (!tempTable.Columns.Contains("Query"))
                        {
                            DataColumn tColumn = new DataColumn("Query");
                            tempTable.Columns.Add(tColumn);
                            tColumn.SetOrdinal(0);
                        }

                        foreach (DataRow dr in tempTable.Rows)
                        {
                            dr["Query"] = query;
                        }

                        if (!resultDS.Tables.Contains(qPlugins.Name.ToString()))
                        {
                            resultDS.Tables.Add(tempTable);
                        }
                        else
                        {
                            resultDS.Tables[qPlugins.Name.ToString()].Merge(tempTable);
                        }
                        pluginsLB.DataContext = resultDS.Tables.Cast<DataTable>().Select(t => t.TableName).ToList();
                    }
                }

So at this point I'm stuck as to how to make this work. There doesn't seem to be good documentation on how to integrate MEF with Rx. My assumption is to make the following change

using (TextReader sr = new StringReader(qPlugins.Result(query, qType, sensitive).Subscribe()))

but this isn't going to work. So any help on making these changes would be greatly appreciated. If you have other suggestions regarding my code, please let me know. I do this as a hobby so I know my code surely isn't up to snuff for most people.


Solution

  • Would this work for you:

    IObservable<DataTable> q =
        from text in qPlugins.Result(query, qType, sensitive)
        from tempTable in Observable.Using(
            () => new GenericParserAdapter(),
            parser => Observable.Using(
                () => new StringReader(text),
                sr => Observable .Start<DataTable>(
                    () =>
                    {
                        var rNum = new Random();
    
                        parser.SetDataSource(sr);
                        parser.ColumnDelimiter = Convert.ToChar(",");
                        parser.FirstRowHasHeader = true;
                        parser.MaxBufferSize = 4096;
                        parser.MaxRows = 500;
                        parser.TextQualifier = '\"';
    
                        var tempTable = parser.GetDataTable();
                        tempTable.TableName = qPlugins.Name.ToString();
                        if (!tempTable.Columns.Contains("Query"))
                        {
                            DataColumn tColumn = new DataColumn("Query");
                            tempTable.Columns.Add(tColumn);
                            tColumn.SetOrdinal(0);
                        }
    
                        foreach (DataRow dr in tempTable.Rows)
                            dr["Query"] = query;
    
                        return tempTable;
                    })))
        select tempTable;