I am trying to use Kafka-streams to create a GlobalKTable from a stream, and getting an exception when I call streams.start()
:
org.apache.kafka.streams.errors.StreamsException: Encountered a topic-partition not associated with any global state store
My code is:
private final KafkaStreams streams;
private final StoreQueryParameters<ReadOnlyKeyValueStore<LocalDate, String>> bankHolidayTypesSqp = StoreQueryParameters.fromNameAndType("bank_holiday_type_store"
,QueryableStoreTypes.<LocalDate, String>keyValueStore());
private final ReadOnlyKeyValueStore<LocalDate, String> localBankHolidayTypeStore;
private void instantiateKafka()
{
// configure Kafka
StreamsBuilder builder = new StreamsBuilder();
// CustomSerializableSerde is just a generic serializer that uses standard java Base64 encoding on any object that implements Serializable - it works in a dummy application I've tested, so I don't think it's the problem
addGlobalTableToStreamsBuilder(builder, bankHolidayTypeTopic,"bank_holiday_type_store", new CustomSerializableSerde<LocalDate>(),Serdes.String());
streams = createStreams("localhost:9092", "C:\\Kafka\\tmp\\kafka-streams-global-tables",MyClass.class.getName(),builder);
streams.start(); // hangs until the global table is built
}
public static <Tk extends Serializable,Tv extends Serializable> StreamsBuilder addGlobalTableToStreamsBuilder(StreamsBuilder builder, String topic
, String store, Serde<Tk> keySerializer, Serde<Tv> valueSerializer)
{
builder.globalTable(topic, Materialized.<Tk, Tv, KeyValueStore<Bytes, byte[]>>as(store)
.withKeySerde(keySerializer)
.withValueSerde(valueSerializer));
return builder;
}
public static KafkaStreams createStreams(final String bootstrapServers, final String stateDir, String clientID, StreamsBuilder finishedBuilder)
{
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationName");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientID);
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
// Set to earliest so we don't miss any data that arrived in the topics before the process started
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaStreams(finishedBuilder.build(), streamsConfiguration);
}
And the producer:
Producer<LocalDate,String> bankHolidayTypeProducer = MyClass<LocalDate,String>createProducer("localhost:9092", BankHolidayData.class.getName()
, CustomSerializer.class.getName(), StringSerializer.class.getName());
//...
HashMap<LocalDate, String> bankHolidaysData = populateBankHolidayMap();
for (LocalDate bhDay : bankHolidaysData.keySet())
{
bankHolidayTypeProducer.send(new ProducerRecord<>(bankHolidayTypeTopic, bhDay, bankHolidaysData.get(bhDay)));
}
public static <Tk extends Serializable, Tv extends Serializable> Producer<Tk,Tv> createProducer(String bootstrapServers
, String clientID, String keySerializerClassName, String valueSerializerClassName)
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
return new KafkaProducer<>(props);
}
My topic is created automatically by the producer when it first produces, and will always exist by the time the GlobalKTable comes to try and read from it. Is this the problem? Do I need to do something when setting up the topic to tell Kafka that it's going to be used by a Streams GlobalKTable?
There had (apparently) been some changes to the structure of the topic, meaning that Streams needed to be reset. To do this, you can use the application Conduktor, or the reset tool found at http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool.