I am new in working with parquet files, I want to develop a mapreduce job that reads many input parquet files with the following shcema:
{
optional int96 dropoff_datetime;
optional float dropoff_latitude;
optional float dropoff_longitude;
optional int32 dropoff_taxizone_id;
optional float ehail_fee;
optional float extra;
optional float fare_amount;
optional float improvement_surcharge;
optional float mta_tax;
optional int32 passenger_count;
optional binary payment_type (UTF8);
optional int96 pickup_datetime;
optional float pickup_latitude;
optional float pickup_longitude;
optional int32 pickup_taxizone_id;
optional int32 rate_code_id;
optional binary store_and_fwd_flag (UTF8);
optional float tip_amount;
optional float tolls_amount;
optional float total_amount;
optional float trip_distance;
optional binary trip_type (UTF8);
optional binary vendor_id (UTF8);
required int64 trip_id;
}
The aim of my job is to compute the average speed of trips for each hour a day, so I need to extract all trip distances and pickup and dropoff times to compute the duration and then the speed, however I get errors when the field trip_distance
does not exist: here is a part from the stack trace:
18/02/28 03:19:01 INFO mapreduce.Job: map 2% reduce 0%
18/02/28 03:19:10 INFO mapreduce.Job: Task Id : attempt_1519722054260_0016_m_000011_2, Status : FAILED
Error: java.lang.RuntimeException: not found 20(trip_distance) element number 0 in group:
dropoff_datetime: Int96Value{Binary{12 constant bytes, [0, 0, 0, 0, 0, 0, 0, 0, -116, 61, 37, 0]}}
payment_type: ""
pickup_datetime: Int96Value{Binary{12 constant bytes, [0, 120, 66, 9, 78, 72, 0, 0, 3, 125, 37, 0]}}
pickup_latitude: 40.7565
pickup_longitude: -73.9781
pickup_taxizone_id: 161
store_and_fwd_flag: ""
trip_type: "uber"
vendor_id: ""
trip_id: 4776003633207
at org.apache.parquet.example.data.simple.SimpleGroup.getValue(SimpleGroup.java:97)
at org.apache.parquet.example.data.simple.SimpleGroup.getValueToString(SimpleGroup.java:119)
at ParquetAssignmentSpeedAverageHours$ParquetMap.map(ParquetAssignmentSpeedAverageHours.java:48)
at ParquetAssignmentSpeedAverageHours$ParquetMap.map(ParquetAssignmentSpeedAverageHours.java:37)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
and this is my mapper class:
public static class ParquetMap extends Mapper<Text, Group, IntWritable, DoubleWritable> {
private DoubleWritable one = new DoubleWritable(1);
private IntWritable time = new IntWritable();
private DoubleWritable result = new DoubleWritable();
@Override
public void map(Text key, Group value, Context context) throws IOException, InterruptedException {
double duration;
double distance;
double speed;
Binary pickupTimestamp = value.getInt96("pickup_datetime", 0);
Binary dropoffTimestamp = value.getInt96("dropoff_datetime", 0);
if (value.getValueToString(20, 0) != null) { //the trip_distance field
distance = value.getFloat("trip_distance", 0);
} else {
distance = 0;
}
try {
if (!pickupTimestamp.equals(dropoffTimestamp)) {
duration = ((double)(getTimestampMillis(dropoffTimestamp) - getTimestampMillis(pickupTimestamp))/3600000);
speed = (float) (distance / duration);
result.set((speed));
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(getTimestampMillis(pickupTimestamp));
time.set(cal.get(Calendar.HOUR_OF_DAY));
context.write(time, result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Any one can help please ? Thanks,
Its a Runtime Exception java.lang.RuntimeException
; which basically indicates that the code is buggy.
The public String getValueToString(int fieldIndex, int index)
method internally calls getValue(int fieldIndex, int index)
method.
The implementation of getValue(...)
is as below
private Object getValue(int fieldIndex, int index) {
List<Object> list;
try {
list = data[fieldIndex];
} catch (IndexOutOfBoundsException e) {
throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + ") in group:\n" + this);
}
try {
return list.get(index);
} catch (IndexOutOfBoundsException e) {
throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + ") element number " + index + " in group:\n" + this);
}
}
Here, if the fieldIndex
or the index
does not exist it throws IndexOutOfBoundsException
, which is re-thrown as RuntimeException
.
What I may suggest that rather than calling the getValueToString(...)
directly, the existence of the field must be checked.
Since all the fields are optional in your dataset, finding it with a fixed fieldIndex
is not reliable. In this case, just assume it exists and let it fail with try-catch
block to detect absence and then set a default value:
try{
distance = value.getFloat("trip_distance", 0);
}catch(RuntimeException e){
distance = 0;
}