Search code examples
neo4japache-kafkaorientdbstream-processingapache-samza

Apache Samza local storage - OrientDB / Neo4J graph instead of KV store


Apache Samza uses RocksDB as the storage engine for local storage. This allows for stateful stream processing and here's a very good overview.

My use case:

  • I have multiple streams of events that I wish to process taken from a system such as Apache Kafka.
  • These events create state - the state I wish to track is based on previous messages received.
  • I wish to generate new stream events based on the calculated state.
  • The input stream events are highly connected and a graph such as OrientDB / Neo4J is the ideal medium for querying the data to create the new stream events.

My question:

Is it possible to use a non-KV store as the local storage for Samza? Has anyone ever done this with OrientDB / Neo4J and is anyone aware of an example?


Solution

  • I've been evaluating Samza and I'm by no means an expert, but I'd recommend you to read the official documentation, and even read through the source code—other than the fact that it's in Scala, it's remarkably approachable.

    In this particular case, toward the bottom of the documentation's page on State Management you have this:

    Other storage engines

    Samza’s fault-tolerance mechanism (sending a local store’s writes to a replicated changelog) is completely decoupled from the storage engine’s data structures and query APIs. While a key-value storage engine is good for general-purpose processing, you can easily add your own storage engines for other types of queries by implementing the StorageEngine interface. Samza’s model is especially amenable to embedded storage engines, which run as a library in the same process as the stream task.

    Some ideas for other storage engines that could be useful: a persistent heap (for running top-N queries), approximate algorithms such as bloom filters and hyperloglog, or full-text indexes such as Lucene. (Patches accepted!)

    I actually read through the code for the default StorageEngine implementation about two weeks ago to gain a better sense of how it works. I definitely don't know enough to say much intelligently about it, but I can point you at it:

    The major implementation concerns seem to be:

    1. Logging all changes to a topic so that the store's state can be restored if a task fails.
    2. Restoring the store's state in a performant manner
    3. Batching writes and caching frequent reads in order to save on trips to the raw store.
    4. Reporting metrics about the use of the store.