I ran into a situation with StreamInsight where I have 1 input source with different types of events that need to be treated differently, but are eventually matched with other events from the same source.
I created a (MUCH) simpler scenario below where an input source generates random numbers (for simplicity sake, 1s and 0s). If the number is even, we want to save it until further notice (unknown duration). If the number is odd, we want to match it with n-1 from the even stream and then remove n-1 from the even stream. If there is no match, the odd number is simply processed through as is with no further calculation. I have everything working as expected up to the point of removing a matching n-1 from the even stream. A match is made, and the match gets pushed to the output adapter, but remains available for another join to be made to the given event. What I have gathered of several days worth of experimentation and research, is somehow I need to clip the even stream event duration (ClipEventDuration), presumably as part of the filter in GenerateEvenJoinQuery, however, everything I have tried has produced no change, or undesired results. I have also tried changing the evenStream to an Interval shape with even less luck. Any help or suggestions would be greatly appreciated.
For example, given a simplified list: [ 1, 0, 1, 1, 0, 0, 1, 1, 1 ]
I would expect the output to look like: [ 1, 100, 1, 100, 100, 1 ]
I would also be accepting of as the real scenario I'm working with, the first output isn't actually possible: Note the 2nd and 3rd 0s are joined to a single 1. [ 1, 100, 1, 100, 1, 1 ]
...
CepStream<int> inputStream = CepStream<int>.Create(
app
, "ATGInputStream"
, typeof(InputAdapterFactory)
, new InputConfig()
, EventShape.Point);
var everythingFilter = from e in inputStream select e;
Query everythingQuery = GenerateEverythingQuery(app, inputStream);
var everythingStream = everythingQuery.ToStream<int>("everythingStream");
Query oddQuery = GenerateOddQuery(app, everythingStream);
var oddAts = new AdvanceTimeSettings(new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), false), null, AdvanceTimePolicy.Drop);
var oddStream = oddQuery.ToStream<int>("oddStream", oddAts);
// only inject a cti in to even when we need it
var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("oddStream"), AdvanceTimePolicy.Adjust);
Query evenQuery = GenerateEvenQuery(app, everythingStream);
var evenStream = evenQuery.ToStream<int>("evenStream", ats);
Query joinQuery = GenerateOddEvenJoinQuery(app, evenStream, oddStream);
var joinStream = joinQuery.ToStream<int>("joinStream");
...
private Query GenerateOddEvenJoinQuery(Application app, CepStream<int> evenStream, CepStream<int> oddStream) {
// (o * e) + 100 is an easy way to tell we had a match
var filter = (from o in oddStream
from e in evenStream
where e == (o - 1)
select (o * e) + 100);
// LEFT ANTI SEMI JOIN
var filter2 = from o in oddStream
where (from e in evenStream where e == o - 1 select e).IsEmpty()
select o;
var joinFilter = filter.Union(filter2);
return joinFilter.ToQuery(
app
, "Number Join Query"
, "Joins number streams."
, EventShape.Point
, StreamEventOrder.FullyOrdered);
}
private Query GenerateEvenQuery(Application app, CepStream<int> stream) {
var evenFilter = (from e in stream where e % 2 == 0 select e).AlterEventDuration(e => TimeSpan.MaxValue);
return evenFilter.ToQuery(
app
, "EvenQuery"
, ""
, EventShape.Edge
, StreamEventOrder.FullyOrdered);
}
private Query GenerateOddQuery(Application app, CepStream<int> stream) {
var filter = (from f in stream where (f % 2) == 1 select f);
return filter.ToQuery(
app
, "OddQuery"
, "Queries for odd numbers in stream."
, EventShape.Point
, StreamEventOrder.FullyOrdered);
}
private Query GenerateEverythingQuery(Application app, CepStream<int> stream) {
var everythingFilter = from e in stream select e;
return everythingFilter.ToQuery(
app
, "EverythingQuery"
, "Queries everything from the input stream."
, EventShape.Point
, StreamEventOrder.FullyOrdered);
}
While I was hoping for something a little more elaborate and potentially faster, the delayed processing may help with performance.
public Program() {
public Program() {
...
var stream = CepStream<RandomNumber>.Create(
app
, "StaticInputStream"
, typeof(StaticInputAdapterFactory)
, new InputConfig()
, EventShape.Point);
var processedStream = stream.Scan(new StreamMatcher());
Query consoleQuery = GenerateConsoleOutputQuery(app, processedStream);
...
}
private Query GenerateConsoleOutputQuery(Application app, CepStream<int> stream) {
var filter = from f in stream select f;
return filter.ToQuery(
app
, "Console Output Query"
, "Queries for messages to output to the console."
, typeof(OutputAdapterFactory)
, new OutputConfig()
, EventShape.Point
, StreamEventOrder.FullyOrdered);
}
public class StreamMatcher : CepPointStreamOperator<RandomNumber, int> {
private List<int> unmatched = new List<int>();
public override bool IsEmpty {
get { return false; }
}
public override IEnumerable<int> ProcessEvent(PointEvent<RandomNumber> inputEvent) {
if(inputEvent.Payload.value % 2 == 0) {
unmatched.Add(inputEvent.Payload.value);
} else {
var result = inputEvent.Payload.value;
int match = -1;
try {
match = (from f in unmatched where f == result - 1 select f).Take(1).Single();
unmatched.Remove(match);
} catch { }
if(match > -1) {
result += match + 100;
}
yield return result;
}
}
}
}
public class RandomNumber {
public int value { get; set; }
public DateTime timeStamp { get; set; }
}
You might consider using a UDSO ( User defined stream operator), where you can keep the state of your zeros.
void Main()
{
var randomNumbers = new []
{
new RandomNumber(){ Value = 1, TimeStamp = DateTime.Parse("2012-01-01 10:01:00 AM") },
new RandomNumber(){ Value = 0, TimeStamp = DateTime.Parse("2012-01-01 10:02:00 AM") },
new RandomNumber(){ Value = 0, TimeStamp = DateTime.Parse("2012-01-01 10:02:00 AM") },
new RandomNumber(){ Value = 1, TimeStamp = DateTime.Parse("2012-01-01 10:03:00 AM") },
new RandomNumber(){ Value = 1, TimeStamp = DateTime.Parse("2012-01-01 10:04:00 AM") },
new RandomNumber(){ Value = 0, TimeStamp = DateTime.Parse("2012-01-01 10:05:00 AM") },
};
var stream = randomNumbers.ToPointStream(Application,
e=> PointEvent.CreateInsert(e.TimeStamp ,e),
AdvanceTimeSettings.IncreasingStartTime) ;
var query = stream.Scan(new MyCalculation());
query.Dump();
}
public class MyCalculation : CepPointStreamOperator<RandomNumber,string>
{
private Queue<int> _queue = new Queue<int>() ;
public override bool IsEmpty
{
get { return false; }
}
public override IEnumerable<string> ProcessEvent(PointEvent<RandomNumber> inputEvent)
{
if (inputEvent.Payload.Value % 2 == 0)
{
_queue.Enqueue(inputEvent.Payload.Value);
}
else
{
var result= inputEvent.Payload.Value.ToString() ;
var list = _queue.ToArray();
for (int i = 0; i < list.Count(); i++)
{
result += list[i];
}
yield return result ;
}
}
}
public class RandomNumber
{
public int Value {get;set;}
public DateTime TimeStamp {get;set;}
}