I'm using Pyflink and the Streaming API to sync data into the file system. The path of the output files were like:
It seems the output file path format is {year}-{month}-{day}--{hour}/part-xxx-{commit}.json
. How can I change the path format to such as {year}/{month}/{day}/{hour}/part-xxx-{commit}.json
Write a custom class extends DateTimeBucketAssigner and override the path generation logic in the getBucketId method
Here's an example - saving to a path with a prefix as the POJO class name:
public class DateTimeWithClassPrefixBucketAssigner<IN> extends DateTimeBucketAssigner {
public String getBucketId(Object element, Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
String prefix = element.getClass().getSimpleName();
return prefix + "/" + dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
End covert data format
import java.text.SimpleDateFormat;
String input = "2022-01-31--10";
String output = new SimpleDateFormat("{year}/{month}/{day}/{hour}/part-xxx.json").format(
new SimpleDateFormat("yyyy-MM-dd--HH").parse(input)