Search code examples
apache-flinkflink-streaming

Flink Get the KeyedState State Value and use in Another Stream


I know that keyed state belongs to the its key and only current key accesses its state value, other keys can not access to the different key's state value.

I tried to access the state with the same key but in different stream. Is it possible?

If it is not possible then I will have 2 duplicate data?

Not: I need two stream because each of them will have different timewindow and also different implementations.

Here is the example (I know that keyBy(sommething) is the same for both stream operations):

public class Sample{
       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(4))
                .process(new CustomMyProcessFunction())
                .name("CustomMyProcessFunction")
                .print();

       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(1))
                .process(new CustomMyAnotherProcessFunction())
                .name("CustomMyProcessFunction")
                .print();
}

public class CustomMyProcessFunction extends ProcessWindowFunction<..>
{
    private Logger logger = LoggerFactory.getLogger(CustomMyProcessFunction.class);
    private transient ValueState<SimpleEntity> simpleEntityValueState;
    private SimpleEntity simpleEntity;

    @Override
    public void open(Configuration parameters) throws Exception
    {
        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value == null)
        {
            SimpleEntity newVal = new SimpleEntity("sample");
            logger.info("New Value put");
            simpleEntityValueState.update(newVal);
        }
        ...
    }
...
}

public class CustomMyAnotherProcessFunction extends ProcessWindowFunction<..>
{


    private transient ValueState<SimpleEntity> simpleEntityValueState;

    @Override
    public void open(Configuration parameters) throws Exception
    {

        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value != null)
            logger.info(value.toString()); // I expect that SimpleEntity("sample")
        out.collect(...);
    }
...
}

Solution

  • As has been pointed out already, state is always local to a single operator instance. It cannot be shared.

    What you can do, however, is stream the state updates from the operator holding the state to other operators that need it. With side outputs you can create complex dataflows without needing to share state.