Search code examples
apache-pulsar

Apache Pulsar: Access state storage in LocalRunner not working


I'm trying to implement a simple Apache Pulsar Function and access the State API in LocalRunner mode, but it's not working.

pom.xml snippet

<dependencies>
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-original</artifactId>
    <version>2.9.1</version>
  </dependency>
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-functions-local-runner-original</artifactId>
      <version>2.9.1</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.13.1</version>
    </dependency>
  </dependencies>

Function class

public class TestFunction implements Function<String, String> {

  public String process(String input, Context context) throws Exception {
    System.out.println(">>> GOT input "+input);
    context.incrCounter("counter",1); //--> try to access state
    return input;
  }
}

Main

public class Main {
  public static final String BROKER_URL = "pulsar://localhost:6650";
  public static final String TOPIC_IN = "test-topic-input";
  public static final String TOPIC_OUT = "test-topic-output";

  public static void main(String[] args) throws Exception {

    FunctionConfig functionConfig = FunctionConfig
        .builder()
        .className(TestFunction.class.getName())
        .inputs(Collections.singleton(TOPIC_IN))
        .name("Test Function")
        .runtime(Runtime.JAVA)
        .subName("Test Function Sub")
        .build();

    LocalRunner localRunner = LocalRunner.builder()
        .brokerServiceUrl(BROKER_URL)
        .stateStorageServiceUrl("bk://127.0.0.1:4181")
        .functionConfig(functionConfig)
        .build();
    localRunner.start(false);

    PulsarClient client = PulsarClient.builder().serviceUrl(BROKER_URL).build();
    Producer<String> producer = client.newProducer(Schema.STRING).topic(TOPIC_IN).create();

    producer.send("Hello World!");
    System.out.println(">>> PRODUCER SENT");
  }

I'm running Pulsar in a local docker container (Docker Desktop on Win10), started like this:

Docker container

docker run -it 
-p 6650:6650  
-p 8080:8080 
-p 4181:4181 #I added that port to expose the bookkeeper, otherwise the function will hang and don't do anything
--mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf 
apachepulsar/pulsar:2.9.1 bin/pulsar standalone

When I start the application the Console shows these logs:

2022-01-07T12:39:48,757+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,863+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,970+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:49,076+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds

... and it goes on an on ...

Pulsar logging shows:

2022-01-07T12:13:48,882+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

2022-01-07T12:13:48,989+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

2022-01-07T12:13:49,097+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

2022-01-07T12:13:49,207+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

... goes on and on ...

What am I doing wrong?


Solution

  • The issue is with the name you chose for your function, "Test Function". Since it has a space in it, that causes issues later on inside Pulsar's state store when it uses that name for the internal storage stream.

    If you remove the space and use "TestFunction" instead, it will work just fine. I have confirmed this myself just now.

    2022-02-07T11:09:04,916-0800 [main] WARN  com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
    >>> PRODUCER SENT
    2022-02-07T11:09:05,267-0800 [public/default/TestFunction-0] INFO  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Opening state table for function public/default/TestFunction
    2022-02-07T11:09:05,279-0800 [client-scheduler-OrderedScheduler-7-0] INFO  org.apache.bookkeeper.clients.SimpleStorageClientImpl - Retrieved table properties for table public_default/TestFunction : stream_id: 1024
    
    2022-02-07T11:09:05,527-0800 [pulsar-client-io-1-2] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-topic-input][Test Function Sub] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
    >>> GOT input Hello World!