I’m having trouble at making an UDF that wants a STREAM and provides a BATCH.
This way:
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
Is there anyone with an example code?? I searched around the web (foruns, documentation) but all the examples are for BATCH-BACH, STREAM-STREAM or BATCH-STREAM.
I saw in the examples that when writing the response to Kapacitor, in the “end_batch(self, end_req)” method, is necessary to kind of “comunicate” that the BATCH has ended, in an example this was made this way:
def end_batch(self, end_req):
# Send begin batch with count of outliers
self._begin_response.begin.size = len(self._batch)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
...
# Send an identical end batch back to Kapacitor
# HERE
response.end.CopyFrom(end_req)
self._agent.write_response(response)
In order to send the BATCH, I have to send it from the “point(self, point)” method, but can’t acess the end_req object and don’t know how to create one.
Thanks in advance! Bye bye!
Hope this is still relevant, I'd make a STREAM-STREAM UDF and pipeline it into a window node. You can keep a copy of the window of data, like in their moving average example, and do any batch analysis on that. If you figured out how to write a STREAM-BATCH UDF I'd love to see it though, way less ugly than my answer.
Edit
jdv was definitely right my last answer was more of a comment for sure. Here's a STREAM-BATCH UDF in python, it just echoes the data that came in on a stream in a batch. It's still a little broken because it can't serialize the point class in the handler snapshot method. So whenever it needs to take a snapshot it crashes, might be solvable by using a different serialization method like pickling or by writing a JSON encoder/decoder for point. I'll get around to fixing that sometime but my work week is nearly done. The main thing you need to do to make a STREAM-BATCH UDF is construct the batch begin and end messages, which is done in the createEndBatch and createStartBatch method respectively.
Edit 2
Fixed the serialization by using a combination of protobufs method and json.