I'm still quite new to the world of stream and batch processing and trying to understnad concepts and speach. It is admitedly very possible that the answer to my question well known, easy to find or even answered a hundred times here at SO, but I was not able to find it.
The background:
I am working in a big scientific project (nuclear fusion research), and we are producing tons of measurement data during experiment runs. Those data are mostly streams of samples tagged with a nanosecond timestamp, where samples can be anything from a single by ADC value, via an array of such, via deeply structured data (with up to hundreds of entries from 1 bit booleans to 64bit double precision floats) to raw HD video frames or even string text messages. If I understand the common terminologies right, I would regard our data as "tabular data", for the most part.
We are working with mostly selfmade software solutions from data acquisition over simple online (streaming) analysis (like scaling, subsampling and such) to our own data sotrage, management and access facilities.
In view of the scale of the operation and the effort for maintaining all those implementations, we are investigating the possibilities to use standard frameworks and tools for more of our tasks.
My question:
In particular at this stage, we are facing the need for more and more sofisticated (automated and manual) data analytics on live/online/realtime data as well as "after the fact" offline/batch analytics of "historic" data. In this endavor, I am trying to understand if and how existing analytics frameworks like Spark, Flink, Storm etc. (possibly supported by message queues like Kafka, Pulsar,...) can support a scenario, where
Simply streaming the online data into storage and querying from there seems no option as we need both raw and analysed data for live monitoring and realtime feedback control of the experiment. Also, letting the user query either a live input signal or a historic batch from storage differently would not be ideal, as our physicists mostly are no data scientists and we would like to keep such "technicalities" away from them and idealy the exact same algorithms should be used for analysing new real time data and old stored data from previous experiments.
Sitenotes:
I would be very greatfull if anyone would be able to understand my question and to shed some light on the topic for me :-)
Many Thanks and kind regards, Beppo
I don't think anyone can say "yes, framework X can definitely handle your workload", because it depends a lot on what you need out of your message processing, e.g. regarding messaging reliability, and how your data streams can be partitioned.
You may be interested in BenchmarkingDistributedStreamProcessingEngines. The paper is using versions of Storm/Flink/Spark that are a few years old (looks like they were released in 2016), but maybe the authors would be willing to let you use their benchmark to evaluate newer versions of the three frameworks?
A very common setup for streaming analytics is to go data source -> Kafka/Pulsar -> analytics framework -> long term data store. This decouples processing from data ingest, and lets you do stuff like reprocessing historical data as if it were new.
I think the first step for you should be to see if you can get the data volume you need through Kafka/Pulsar. Either generate a test set manually, or grab some data you think could be representative from your production environment, and see if you can put it through Kafka/Pulsar at the throughput/latency you need.
Remember to consider partitioning of your data. If some of your data streams could be processed independently (i.e. ordering doesn't matter), you should not be putting them in the same partitions. For example, there is probably no reason to mix sensor measurements and the video feed streams. If you can separate your data into independent streams, you are less likely to run into bottlenecks both in Kafka/Pulsar and the analytics framework. Separate data streams would also allow you to parallelize processing in the analytics framework much better, as you could run e.g. video feed and sensor processing on different machines.
Once you know whether you can get enough throughput through Kafka/Pulsar, you should write a small example for each of the 3 frameworks. To start, I would just receive and drop the data from Kafka/Pulsar, which should let you know early whether there's a bottleneck in the Kafka/Pulsar -> analytics path. After that, you can extend the example to do something interesting with the example data, e.g. do a bit of processing like what you might want to do in production.
You also need to consider which kinds of processing guarantees you need for your data streams. Generally you will pay a performance penalty for guaranteeing at-least-once or exactly-once processing. For some types of data (e.g. the video feed), it might be okay to occasionally lose messages. Once you decide on a needed guarantee, you can configure the analytics frameworks appropriately (e.g. disable acking in Storm), and try benchmarking on your test data.
Just to answer some of your questions more explicitly:
The live data analysis/monitoring use case sounds like it fits the Storm/Flink systems fairly well. Hooking it up to Kafka/Pulsar directly, and then doing whatever analytics you need sounds like it could work for you.
Reprocessing of historical data is going to depend on what kind of queries you need to do. If you simply need a time interval + id, you can likely do that with Kafka plus a filter or appropriate partitioning. Kafka lets you start processing at a specific timestamp, and if you data is partitioned by id or you filter it as the first step in your analytics, you could start at the provided timestamp and stop processing when you hit a message outside the time window. This only applies if the timestamp you're interested in is when the message was added to Kafka though. I also don't believe Kafka supports below-millisecond resolution on the timestamps it generates.
If you need to do more advanced queries (e.g. you need to look at timestamps generated by your sensors), you could look at using Cassandra or Elasticsearch or Solr as your permanent data store. You will also want to investigate how to get the data from those systems back into your analytics system. For example, I believe Spark ships with a connector for reading from Elasticsearch, while Elasticsearch provides a connector for Storm. You should check whether such a connector exists for your data store/analytics system combination, or be willing to write your own.
Edit: Elaborating to answer your comment.
I was not aware that Kafka or Pulsar supported timestamps specified by the user, but sure enough, they both do. I don't see that Pulsar supports sub-millisecond timestamps though?
The idea you describe can definitely be supported by Kafka.
What you need is the ability to start a Kafka/Pulsar client at a specific timestamp, and read forward. Pulsar doesn't seem to support this yet, but Kafka does.
You need to guarantee that when you write data into a partition, they arrive in order of timestamp. This means that you are not allowed to e.g. write first message 1 with timestamp 10, and then message 2 with timestamp 5.
If you can make sure you write messages in order to Kafka, the example you describe will work. Then you can say "Start at timestamp 'last night at midnight'", and Kafka will start there. As live data comes in, it will receive it and add it to the end of its log. When the consumer/analytics framework has read all the data from last midnight to current time, it will start waiting for new (live) data to arrive, and process it as it comes in. You can then write custom code in your analytics framework to make sure it stops processing when it reaches the first message with timestamp 'tomorrow night'.
With regard to support of sub-millisecond timestamps, I don't think Kafka or Pulsar will support it out of the box, but you can work around it reasonably easily. Just put the sub-millisecond timestamp in the message as a custom field. When you want to start at e.g. timestamp 9ms 10ns, you ask Kafka to start at 9ms, and use a filter in the analytics framework to drop all messages between 9ms and 9ms 10ns.