I'm trying to implement a simple Apache Pulsar Function and access the State API in LocalRunner mode, but it's not working.
pom.xml snippet
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.1</version>
</dependency>
</dependencies>
Function class
public class TestFunction implements Function<String, String> {
public String process(String input, Context context) throws Exception {
System.out.println(">>> GOT input "+input);
context.incrCounter("counter",1); //--> try to access state
return input;
}
}
Main
public class Main {
public static final String BROKER_URL = "pulsar://localhost:6650";
public static final String TOPIC_IN = "test-topic-input";
public static final String TOPIC_OUT = "test-topic-output";
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = FunctionConfig
.builder()
.className(TestFunction.class.getName())
.inputs(Collections.singleton(TOPIC_IN))
.name("Test Function")
.runtime(Runtime.JAVA)
.subName("Test Function Sub")
.build();
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl(BROKER_URL)
.stateStorageServiceUrl("bk://127.0.0.1:4181")
.functionConfig(functionConfig)
.build();
localRunner.start(false);
PulsarClient client = PulsarClient.builder().serviceUrl(BROKER_URL).build();
Producer<String> producer = client.newProducer(Schema.STRING).topic(TOPIC_IN).create();
producer.send("Hello World!");
System.out.println(">>> PRODUCER SENT");
}
I'm running Pulsar in a local docker container (Docker Desktop on Win10), started like this:
Docker container
docker run -it
-p 6650:6650
-p 8080:8080
-p 4181:4181 #I added that port to expose the bookkeeper, otherwise the function will hang and don't do anything
--mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf
apachepulsar/pulsar:2.9.1 bin/pulsar standalone
When I start the application the Console shows these logs:
2022-01-07T12:39:48,757+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,863+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,970+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:49,076+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
... and it goes on an on ...
Pulsar logging shows:
2022-01-07T12:13:48,882+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:48,989+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,097+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,207+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
... goes on and on ...
What am I doing wrong?
The issue is with the name you chose for your function, "Test Function". Since it has a space in it, that causes issues later on inside Pulsar's state store when it uses that name for the internal storage stream.
If you remove the space and use "TestFunction" instead, it will work just fine. I have confirmed this myself just now.
2022-02-07T11:09:04,916-0800 [main] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
>>> PRODUCER SENT
2022-02-07T11:09:05,267-0800 [public/default/TestFunction-0] INFO org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Opening state table for function public/default/TestFunction
2022-02-07T11:09:05,279-0800 [client-scheduler-OrderedScheduler-7-0] INFO org.apache.bookkeeper.clients.SimpleStorageClientImpl - Retrieved table properties for table public_default/TestFunction : stream_id: 1024
2022-02-07T11:09:05,527-0800 [pulsar-client-io-1-2] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-topic-input][Test Function Sub] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
>>> GOT input Hello World!