Search code examples
javaesper

Get Previous and New EventBean with Esper Continuous Query


I am new to Esper and trying to get old and new EventBean while updating data. However, I am getting old EventBean is always null. Is there any configuration missing due to the old EventBean being null?

public class TestApp1 {

    public static void main(String[] arg) throws EPCompileException, EPDeployException {
        EPCompiler compiler = EPCompilerProvider.getCompiler();
        Configuration configuration = new Configuration();
        configuration.getCommon().addEventType(PersonEvent.class);

        CompilerArguments args = new CompilerArguments(configuration);

        EPCompiled epCompiled = compiler.compile("@name('my-statement') select * from PersonEvent", args);

        EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration);
        EPDeploymentService deploymentService = runtime.getDeploymentService();
        EPDeployment deployment = deploymentService.deploy(epCompiled);

        EPStatement statement = deploymentService.getStatement(deployment.getDeploymentId(), "my-statement");

        statement.addListener((newData, oldData, stmt, rt) -> {
            extracted(newData, "New");
            extracted(oldData, "Old");
            sleep(0);
        });


        publish(runtime);


    }

    private static void extracted(EventBean[] datas, String type) {
        if (datas == null || datas.length == 0) {
            System.out.println(type + " : " + datas);
            return;
        }
        for(EventBean data :  datas) {
            String name = (String) data.get("name");
            int age = (int) data.get("age");
            System.out.printf(LocalTime.now() + "::" + type + " - Got event ---- Name: %s, Age: %d%n", name, age);
        }
    }

    private static void sleep(int i) {
        try {
            TimeUnit.MILLISECONDS.sleep(i * 500);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static void publish(EPRuntime runtime) {
        EPEventService eventService = runtime.getEventService();
        eventService.sendEventBean(new PersonEvent("Peter", 10), "PersonEvent");
        sleep(1);
        eventService.sendEventBean(new PersonEvent("Peter", 11), "PersonEvent");
        sleep(1);
        eventService.sendEventBean(new PersonEvent("Peter", 12), "PersonEvent");
        sleep(1);
        eventService.sendEventBean(new PersonEvent("Hello", 6), "PersonEvent");
        sleep(1);
        eventService.sendEventBean(new PersonEvent("Peter", 13), "PersonEvent");
    }
}


class PersonEvent {
  private String name;
  private int age;

  public PersonEvent(String name, int age) {
    this.name = name;
    this.age = age;
  }

  public String getName() {
    return name;
  }

  public int getAge() {
    return age;
  }
}

and output of the above program is -

13:07:47.355::New - Got event ---- Name: Peter, Age: 10 Old : null
13:07:47.863::New - Got event ---- Name: Peter, Age: 11 Old : null
13:07:48.363::New - Got event ---- Name: Peter, Age: 12 Old : null
13:07:48.877::New - Got event ---- Name: Hello, Age: 6 Old : null
13:07:49.389::New - Got event ---- Name: Peter, Age: 13 Old : null


Solution

  • As per the documentation of UpdateListener -

    Notify that new events are available or old events are removed. If the call to update contains new (inserted) events, then the first argument will be a non-empty list and the second will be empty. Similarly, if the call is a notification of deleted events, then the first argument will be empty and the second will be non-empty. Either the newEvents or oldEvents will be non-null. This method won't be called with both arguments being null (unless using output rate limiting or force-output options), but either one could be null. The same is true for zero-length arrays. Either newEvents or oldEvents will be non-empty. If both are non-empty, then the update is a modification notification. Params: newEvents – is any new events. This will be null or empty if the update is for old events only. oldEvents – is any old events. This will be null or empty if the update is for new events only. statement – is the statement producing the result runtime – is the runtime

    I did not find a way to receive both values i.e. oldEvent and newEvent at the same time however we can get data from both events in newEvent. The following query is giving me the expected data -

    select name, age, prev(1, age) as oldAge from PersonEvent#groupwin(name)#length(2)