Search code examples
javaannotationsespersubscriberepl

Alternative to setSubscriber in Esper


I am trying to attach a subscriber to an event in Esper but I would like to use .epl file for that. I've been browsing repositories and I have seen examples of doing that by using annotation interfaces. I was trying to do it the same way they do it in CoinTrader, but I can't seem to get it to work. Yet, if I set the subscriber in Java, it works.

This is my project structure for reference

This is my .epl file:

module queries;

import events.*;
import configDemo.*;
import annotations.*;

create schema MyTickEvent as TickEvent;

@Name('allEvents')
@Description('test')
@Subscriber(className='configDemo.TickSubscriber')
select * from TickEvent; 


@Name('tickEvent')
@Description('Get a tick event every 3 seconds')
select currentPrice from TickEvent;

This is my config file:

<?xml version="1.0" encoding="UTF-8"?>

<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.espertech.com/schema/esper"
xsi:noNamespaceSchemaLocation="esper-configuration-6-0.xsd">


<event-type-auto-name package-name="events"/>

<auto-import import-name="annotations.*"/>
<auto-import import-name="events.*"/>
<auto-import import-name="configDemo.*"/>

This is my Subscriber interface:

package annotations;

public @interface Subscriber {

String className();
}

This is my event class:

package configDemo;

import events.TickEvent;

public class TickSubscriber {
public void update(TickEvent tick) {
    System.out.println("Event registered by subscriber  - Tick is: " + 
tick.getCurrentPrice());
}
}

And my main file is this:

 package configDemo;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.deploy.DeploymentException;
import com.espertech.esper.client.deploy.DeploymentOptions;
import com.espertech.esper.client.deploy.Module;
import com.espertech.esper.client.deploy.ParseException;

public class Main {

    public static EngineHelper engineHelper; 
    public static Thread engineThread;
    public static boolean continuousSimulation = true;

    public static void main(String[] args) throws DeploymentException, InterruptedException, IOException, ParseException {

        engineHelper = new EngineHelper();
        DeploymentOptions options = new DeploymentOptions();
        options.setIsolatedServiceProvider("validation"); // we isolate any statements
        options.setValidateOnly(true); // validate leaving no started statements
        options.setFailFast(false); // do not fail on first error

        Module queries = engineHelper.getDeployAdmin().read("queries.epl");
        engineHelper.getDeployAdmin().deploy(queries, null);

        CountDownLatch latch = new CountDownLatch(1);

        EPStatement epl = engineHelper.getAdmin().getStatement("allEvents");
        //epl.setSubscriber(new TickSubscriber());
        engineThread = new Thread(new EngineThread(latch, continuousSimulation, engineHelper.getRuntime()));
        engineThread.start();


    }



}

As you can see the setSubscriber line is commented out. When I run it as is, I expected that the subscriber will be recognized and registered and yet it isn't. I only get the tick events flowing in the console. If I decomment the line and I run it, I get a notification after each tick that the subscriber received the event and it all works fine.

What am I doing wrong? How can I set a subscriber within the .epl file?


Solution

  • Assigning a subscriber is done by the application and is not something that the engine does for you. The app code would need to loop thru the statements, get the annotations "stmt.getAnnotations" and inspect these and assign the subscriber.