Search code examples
apache-flinkflink-streamingpyflink

Can I change the output path format of Flink Streaming File Sink?


I'm using Pyflink and the Streaming API to sync data into the file system. The path of the output files were like:

-2023-01-28--01
 |-part-xxx-0.json
-2023-01-28--03
 |-part-xxx-0.json

It seems the output file path format is {year}-{month}-{day}--{hour}/part-xxx-{commit}.json. How can I change the path format to such as {year}/{month}/{day}/{hour}/part-xxx-{commit}.json?


Solution

  • Write a custom class extends DateTimeBucketAssigner and override the path generation logic in the getBucketId method

    Here's an example - saving to a path with a prefix as the POJO class name:

    public class DateTimeWithClassPrefixBucketAssigner<IN> extends DateTimeBucketAssigner {
    ....
        @Override
        public String getBucketId(Object element, Context context) {
            if (dateTimeFormatter == null) {
                dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
            }
            String prefix = element.getClass().getSimpleName();
            return prefix + "/" + dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
        }
    }
    

    End covert data format

    import java.text.SimpleDateFormat;
    ...
        String input = "2022-01-31--10";
        String output = new SimpleDateFormat("{year}/{month}/{day}/{hour}/part-xxx.json").format(
                new SimpleDateFormat("yyyy-MM-dd--HH").parse(input)