Search code examples
c#eventsgetcouchdb

Consume EventArgs from CouchDB


i am struggling with consuming the data i get from my CouchDB database.

I am trying to consume new data that comes to the specific view. CouchDB offers an option for feed=continous, but i tested it and dont get any data, same in postman. But if i change it to feed=eventsource i can see the changes in the console. But i dont know how to handle the events. I opened a method with the right connection, but im stuck now, any help would be great.

public async Task ObserveDbAndTrigger()
    {
        var url = "http://localhost:5984/MyDB/_changes?feed=eventsource&filter=_view&view=MyView&include_docs=true&attachments=true&heartbeat=1000&since=0";

        using (var client = new HttpClient())
        {
            client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
            client.DefaultRequestHeaders.Add("Accept", "application/json");
            client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:password" + $"")));

            var request = new HttpRequestMessage(HttpMethod.Get, url);

            // handle the incoming events and work with the incoming data
            
        }
    }

Any suggestions ?


Solution

  • Clearly there's work to be done. Normally I shy away from answering such questions as posed because it seems like a code service request, but I believe this answer may benefit others beyond the OP.

    Here is an extremely naïve bit of code meant to illustrate event delegation and the simplicity of communicating with CouchDB over TCP.

    Ultimately this demonstrates the publish/subscribe pattern, which is a reasonable fit. I tested this against CouchDB 2.3 on Windows. The code is hardwired to localhost:5984 because whatever.

    class NaiveChangeWatcher
    {
        static void Main(string[] args)
        {
            if (args.Length >= 4)
            {
                // set up server info.
                string db = args[0];
                string auth = "Basic " + Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(String.Join(":", args[1], args[2])));
                string query = db + "/_changes?feed=continuous&since=0&heartbeat=" + args[3];
                // init the publisher
                ChangesPublisher pub = new ChangesPublisher();
                // let's subscribe to the OnChange event which writes event data to the console.
                pub.OnChange += (sender, e) => Console.WriteLine(e.Value);
                pub.OnException += (sender, e) => Console.WriteLine(e.Value.ToString() + "\r\n\r\nPress a key to exit.");
                //// start publishing.            
                Task.Run(async () =>
                {
                    await pub.Begin("localhost", 5984, query, auth, int.Parse(args[3]));
                });
                // Press a key when bored of it all
                Console.ReadKey();
                // stop the publisher gracefully
                pub.Stop = true;
            }
            else
            {
                Console.WriteLine("usage: NaiveChangeWatcher db_name username password timeout_millis");
            }
        }
        //
        // The ChangesPublisher notifies subscribers of new data from the changes feed
        // via the ChangeEvent. The publisher will trigger an OnException event in the
        // event of an exception prior to ending its task.
        //
        public class ChangesPublisher
        {
            // Set to true to stop publishing. This causes the Begin method to complete.
            public bool Stop { get; set; }
            // The event posted when data from the server arrived
            public class ChangeEvent : EventArgs
            {
                public string Value { get; set; }
                public ChangeEvent(string value)
                {
                    Value = value;
                }
            }
            // Event triggered when the subscriber croaks by exception
            public class ExceptionEvent : EventArgs
            {
                public Exception Value { get; set; }
                public ExceptionEvent(Exception value)
                {
                    Value = value;
                }
            }
            // Subscription to changes from  the _changes endpoint
            public event EventHandler<ChangeEvent> OnChange = delegate { };
            // Subscription to publisher exit on error
            public event EventHandler<ExceptionEvent> OnException = delegate { };
            
            public async Task Begin(string serverAddr, int port, string query, string auth, int timeout)
            {
                using (var client = new TcpClient())
                {                  
                    string request = String.Join("\r\n", new List<string> {
                        String.Format("GET /{0} HTTP/1.1",query),
                        "Authorization: " + auth,
                        "Accept: application/json",
                        "Host: " + serverAddr,
                        "Connection: keep-alive",
                        "\r\n"
                    });
                     
                    try
                    {
                        await client.ConnectAsync(serverAddr, port);
                        using (NetworkStream stream = client.GetStream())
                        {                           
                            StreamWriter writer = new StreamWriter(stream);
                            await writer.WriteAsync(request);
                            await writer.FlushAsync();
    
                            // read lines from the server, ad nauseum.
                            StreamReader reader = new StreamReader(stream);
                            while (!Stop)
                            {
                                string data = await reader.ReadLineAsync();
                                // emit a change event
                                OnChange(this, new ChangeEvent(data));
                            }
                        }
                    }
                    catch (Exception e)
                    {
                        OnException(this, new ExceptionEvent(e));                        
                    }
                }
            }
        }
    }