Search code examples
joinapache-flinkflink-streamingstatefuldata-stream

Flink Data Stream Enrichment: Connecting two data streams with different throughput and multi level keying


I'm farely new to Flink and have wanted to join two data streams of two fictional data sources for a showcase on stateful data streaming.

These Data Streams provide data in the form of the following (JSON):

1. Pizza Order

{
  "id": 123,
  "shop": "Mario's kitchen",
  "pizzas": [
    {
      "name": "Diavolo"
    },
    {
      "name": "Hawaii"
    }
  ],
  "timestamp": 12345678
}

2. Pizza Price

{
  "name": "Diavolo",
  "shop": "Mario's kitchen",
  "price": 14.2,
  "timestamp": 12345678
}

The second data streams updates less frequently than the first.

The idea was to enrich the pizzas in the first "Pizza Order" data stream with the price tags of the second data stream. The result would look as follows:

3. Enriched Pizza Order

{
  "id": 123,
  "shop": "Mario's kitchen",
  "pizzas": [
    {
      "name": "Diavolo",
      "price": 14.2
    },
    {
      "name": "Hawaii",
      "price": 12.5
    }
  ],
  "timestamp": 12345678
}

The enriched pizza order data stream should only contain the most current pizza price up to the timestamp of the associated pizza order.

I have seen in many tutorials that this can be achieved by using the "keyBy" operator, connecting the streams with an appropriate key to match.

This is where my problem arises, due to the issue of not having an appropriate top level key on both input streams. How would you try to achieve this?


My most recent attempt to solve this issue was as follows (Java):

{
        [...]
        DataStream<PizzaOrder> pizzaOrderStream = env.fromSource(
                this.pizzaOrderSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Pizza Order Topic"
        );

        DataStream<PizzaPrice> pizzaPriceStream = env.fromSource(
                this.pizzaPriceSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Pizza Price Topic"
        );

        DataStream<EnrichedPizzaOrder> enrichedPizzaOrderDataStream =
                pizzaOrderStream.keyBy(PizzaOrder::getShop)
                .connect(pizzaPriceStream.keyBy(PizzaPrice::getShop))
                .process(new ProcessingTimeJoin());

        enrichedPizzaOrderDataStream.sinkTo(sink);

        return env.execute("Pizza Order Enriching Example");
}

public static class ProcessingTimeJoin extends CoProcessFunction<PizzaOrder, 
PizzaPrice, EnrichedPizzaOrder> {

        // pizza name and price (implicitly keyed by store)
        private ValueState<HashMap<String, Double>> pizzaPriceState;

        @Override
        public void  open(Configuration parameters) throws Exception {
            ValueStateDescriptor<HashMap<String, Double>> vDescriptor = new ValueStateDescriptor<HashMap<String, Double>>(
                    "pizzaPriceState",
                    TypeInformation.of(new TypeHint<HashMap<String, Double>>() {})
            );
            pizzaPriceState = getRuntimeContext().getState(vDescriptor);
        }

        @Override
        public void processElement1(PizzaOrder order,
                                    CoProcessFunction<PizzaOrder, PizzaPrice, EnrichedPizzaOrder>.Context ctx,
                                    Collector<EnrichedPizzaOrder> out) throws Exception {
            HashMap<String, Double> state = pizzaPriceState.value();
            if (state == null) {
                state = new HashMap<>();
            }
            List<EnrichedPizza> enrichedPizzas = new ArrayList<>();
            for (Pizza pizza : order.getPizzas()) {
                double price = state.getOrDefault(pizza.getPizzaName(), -1.0);

                EnrichedPizza newPizza = new EnrichedPizza(pizza, price);
                enrichedPizzas.add(newPizza);
            }
            EnrichedPizzaOrder enrichedPizzaOrder = new EnrichedPizzaOrder(order, enrichedPizzas);
            out.collect(enrichedPizzaOrder);
        }

        @Override
        public void processElement2(PizzaPrice price,
                                    CoProcessFunction<PizzaOrder, PizzaPrice, EnrichedPizzaOrder>.Context ctx,
                                    Collector<EnrichedPizzaOrder> out) throws Exception {
            HashMap<String, Double> state = pizzaPriceState.value();
            if (state == null) {
                state = new HashMap<>();
            }
            state.put(price.getName(), price.getPrice());
            pizzaPriceState.update(state);
        }
    }

Solution

  • The "Pizza Price" stream is classic enrichment data, and thus could be a broadcast stream, which you connect to the "Pizza Order" stream and use as per The Broadcast State Pattern.

    Or you could flatten the Pizza Order records, so one record turns into N, each with a single pizza, and then key by shop & pizza. You could then key the Pizza Price stream by the same two fields, and connect the streams. After enrichment, you'd have to key by order id, and re-create the unflattened record.

    In either case you have to deal with time-based joining, where your Pizza Price stream is stored in (say) MapState<Long, Float> where you can iterate over the entries to find the best one (time <= order time).

    Or use the Table API to help you with the above. See Temporal Joins.