Search code examples
javajsonapache-flinkfastjson

in flink, I can`t map String to JSONObject


source code: source a String,and I try to map it into JSONObject,and i failed.

public class Test {
public static void main(String\[\] args) throws Exception {
//TODO 1.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

        //TODO 2.
        DataStreamSource<String> inputDS = env.fromElements("{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}");
    
    
        //TODO 3.
        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject);
            }
        });
        jsonObjDS.print();
    
        env.execute();
    }

}

error:Assigned key must not be null!

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!

Process finished with exit code 1

I tried map,flatmap,process.They all failed when my collector collecting them. after debug,i found that String have been parsed into JSONObject,but just can not be collected.


Solution

  • To get it to compile, I had to change the code slightly, to

    public void processElement(
        String value,
        ProcessFunction<String, JSONObject>.Context ctx,
        Collector<JSONObject> out) throws Exception {
            JSONObject jsonObject = new JSONObject(value);
            out.collect(jsonObject);
        }
    }
    

    and it works fine. It prints out

    {"bill_info":{"ADDER_NAME":"sss","ADDER_NO":"0706","UPDATER_NAME":"ssss","UPDATER_NO":"0706","BILL_ID":"8687b584-038c-498c-8f97-ec1ca197da96","ADD_TIME":"2022-11-12 16:05:28:418","ORDER_ID":"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67","S_USER_ID":"s68","B_USER_ID":"b77","UPDATE_TIME":"2022-11-12 16:05:28:418"}}
    

    I used

    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20180130</version>
    </dependency>
    

    Note, however, that's generally not a good idea to be sending around JSONObjects -- they are expensive to serialize and deserialize. It would be better to deserialize the JSON to a POJO, along the lines of what is shown in this recipe.