I'm using Pyflink and the Streaming API to sync data into the file system. The path of the output files were like:
-2023-01-28--01
|-part-xxx-0.json
-2023-01-28--03
|-part-xxx-0.json
It seems the output file path format is {year}-{month}-{day}--{hour}/part-xxx-{commit}.json
. How can I change the path format to such as {year}/{month}/{day}/{hour}/part-xxx-{commit}.json
?
Write a custom class extends DateTimeBucketAssigner and override the path generation logic in the getBucketId method
Here's an example - saving to a path with a prefix as the POJO class name:
public class DateTimeWithClassPrefixBucketAssigner<IN> extends DateTimeBucketAssigner {
....
@Override
public String getBucketId(Object element, Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
String prefix = element.getClass().getSimpleName();
return prefix + "/" + dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
}
}
End covert data format
import java.text.SimpleDateFormat;
...
String input = "2022-01-31--10";
String output = new SimpleDateFormat("{year}/{month}/{day}/{hour}/part-xxx.json").format(
new SimpleDateFormat("yyyy-MM-dd--HH").parse(input)