Search code examples
time-seriesofflineolaprollupdruid

Druid for non time-series data


For cases where the data get sent to Druid immediately as its generated, all is fine and dandy (as in IoT). Love it.

But now I have different situation, stemming from late data-entry.

The end-user can go offline (losing internet connection), and the data gets stored in her mobile phone, and only get sent to Druid once she goes back online.

That means, by the time she recovers her internet, the data sent to Druid (through Tranquility server for example) will be rejected by Druid (because Druid real-time does not accept past data).

Of course I can set the timestamp to the time the data gets sent to the server. But that would skew the report..., unless..., if I add another field (let's say: generated_ts), and declare it as yet another dimension.

But then I would not benefit from the automatic time-based roll-ups you get for free in Druid (?). I would have to use groupBy (with that generated_ts as one of the dimensions), like this:

{
  "queryType": "groupBy",
  "dataSource": "my_datasource",
  "granularity": "none",
  "dimensions": [
    "city",
    {
      "type" : "extraction",
      "dimension" : "generated_ts",
      "outputName" :  "dayOfWeek",
      "extractionFn" : {
        "type" : "timeFormat",
        "format" : "EEEE"
      }
    }
  ],
  ...
}

My questions are:

  1. Is the approach valid?
  2. If yes: what's the penalty? (I guess it would be performance, but how bad?)

Thanks, Raka

--

Responding to Ramkumar's response below, follow-up question:

I still don't quite understand this batch ingestion:

Let's suppose event A. It's generated at timestamp 3, and was not sent to the server until timestamp 15.

And when it's sent at timestamp 15, it has this value: {ts: 15, generated_ts: 3, metric1: 12, dimension1: 'a'}

They timestamp key being "ts".

It is inaccurate, the ideal would be {ts: 3, generated_ts: 3, metric1: 12, dimension1: 'a'}, but I had to specify 15 as inserted_ts just so that Tranquility accepts it.

Now, during batch ingestion, I want to fix it, now it has the correct ts {ts: 3, generated_ts: 3, metric1: 12, dimension1: 'a'}.

The question: will I have duplicate event then?

Or... (this I suspect): batch ingestion for a specified time-interval basically will replace all the data within that interval? (I hope this is the case, then I can stop worrying about data duplication)

Additional note (just in): I came across this: https://github.com/druid-io/tranquility/blob/master/docs/overview.md#segment-granularity-and-window-period

that says:

Our approach at Metamarkets is to send all of our data through Tranquility in real-time, but to also mitigate these risks by storing a copy in S3 and following up with a nightly Hadoop batch indexing job to re-ingest the data. This allow us to guarantee that in the end, every event is represented exactly once in Druid.

So... it's a re-ingestion, whose meaning (I guess) a complete replace, right?


Solution

  • We had a similar problem and we solved it using lambda architecture. We have 2 pipelines in our setup:

    1. Our real-time pipeline feeds off from Kafka+Spark and ingests into druid. This is going to be real-time data. Data which are older than the granularity that druid expects will be rejected though. So this has data loss for late data arrivals.
    2. Our batch pipeline would ingest data into Hadoop every hour, then we trigger a batch ingestion job into Druid. This will create segments for the timestamp mentioned in the key, do aggregation and replace older segments with the same timestamp. In practice, druid's storage principles are based on immutability, MVCC and log-structured storage. So as new versions of segments come for the same timestamp, older segments are garbage-collected.

    More details on the batch ingestion: Our batch job operates data from HDFS, which are organised into hourly folders. Any late event we get is put into the correct hourly bucket. We have an SLA for late data as XXX hours (called watermarks if you have read the great article). So we take the current hour, subtract XXX and take the corresponding hourly folder files and trigger a batch ingestion job on druid for that particular hour. Note that this can still cause data loss if events arrive before the watermark, but we need to make the compromise, as druid doesn't support in-place updates to segments for a particular hour and we also cannot have arbitrarily long watermark, as our storage on the HDFS side is very limited.