I am connecting to third party data feed provider's server using websocket. For websocket connection my code is :
this.websocket = new WebSocket("wss://socket.polygon.io/stocks", sslProtocols: SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls);
So when connection become established we are receiving nearly 70,000 to 1,00,000 records on every minute. So after that we bifurcating those response and store it in it's individual files. Like if we receive data for AAPL then we store that data into AAPL's file. Same as for FB, MSFT, IBM, QQQ,and so on. We have total 10,000 files which we need to handle at a time and store live records according to it.
public static string tempFile = @"D:\TempFileForLiveMarket\tempFileStoreLiveSymbols.txt";
public static System.IO.StreamWriter w;
private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
using (w = System.IO.File.AppendText(tempFile))
{
Log(e.Message, w);
}
using (System.IO.StreamReader r = System.IO.File.OpenText(tempFile))
{
DumpLog(r);
}
}
public static void Log(string responseMessage, System.IO.TextWriter w)
{
w.WriteLine(responseMessage);
}
public static void DumpLog(System.IO.StreamReader r)
{
string line;
while ((line = r.ReadLine()) != null)
{
WriteRecord(line);
}
}
public static void WriteRecord(string data)
{
List<LiveData> ld = JsonConvert.DeserializeObject<List<LiveData>>(data);
var filterData = ld.Where(x => symbolList.Contains(x.sym));
List<string> fileLines = new List<string>();
foreach (var item in filterData)
{
var fileName = @"D:\SymbolsData\"+item.sym+ "_day_Aggregate.txt";
fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
if (fileLines.Count > 1)
{
var lastLine = fileLines.Last();
if (!lastLine.Contains(item.sym))
{
fileLines.RemoveAt(fileLines.Count - 1);
}
}
fileLines.Add(item.sym + "," + item.s + "," + item.p + "-----");
System.IO.File.WriteAllLines(fileName, fileLines);
}
}
So, when websocket connection established and perform actions with live market data with our 10,000 files then it's become slower and also websocket connection become closed after few minutes and passing message like below :
Websocket Error
Received an unexpected EOF or 0 bytes from the transport stream.
Connection Closed...
I am performing whole this process because in next phase I need to perform technical analysis on live price of each and every symbols. So how can I handle this situation ? How can I make process faster then this processing speed? and how can I stop for connection closed ?
After Edit
I replace stream writer and temp file with String Builder like follow,
public static StringBuilder sb = new StringBuilder();
public static System.IO.StringWriter sw;
private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
sw = new System.IO.StringWriter(sb);
sw.WriteLine(e.Message);
Reader();
}
public static void Reader()
{
System.IO.StringReader _sr = new System.IO.StringReader(sb.ToString());
while (_sr.Peek() > -1)
{
WriteRecord(sb.ToString());
}
sb.Remove(0, sb.Length);
}
public static void WriteRecord(string data)
{
List<LiveData> ld = JsonConvert.DeserializeObject<List<LiveData>>(data);
foreach (var item in filterData)
{
var fileName = @"D:\SymbolsData\"+item.sym+ "_day_Aggregate.txt";
fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
fileLines.RemoveAt(fileLines.Count - 1);
fileLines.Add(item.sym + "," + item.s + "," + item.p)
System.IO.File.WriteAllLines(fileName, fileLines);
}
}
It looks like you append each message to the tempFile
, but then you process the entire tempFile
. This means you are constantly re-processing the old data plus the new record, so yes: it will gradually take longer and longer and longer until it takes so long that the other end gets bored of waiting, and cuts you off. My advice: don't do that.
There's also a lot of things you could do more efficiently in the actually processing of each record, but that is irrelevant compared to the overhead of constantly re-processing everything.