I have a topic which has data in the format
{
before: {...},
after: {...},
source: {...},
op: 'u'
}
The data was produced by Debezium. I want to send the data to SQL Server db table, so I selected JDBC Sink Connector. I need to process the data before sending it to downstream.
Logic that needs to be applied:
if op = 'u' or op = 'c' or op = 'r' // update or insert or snapshot
select all the fields present in 'after' and perform upsert to downstream.
if op = 'd' // delete
select all the fields present in 'before' + add a field IsActive=false and perform upsert to downstream.
How can I achieve this?
I was able do achieve this using custom transform in sink jdbc connector.
I extracted the after
field and op
field and applied the logic. There is no direct way to update the record i.e. there is no method to setSchema and setValue. So i have used reflection to update schema and value.
The below code snippets:
private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
private final ExtractField<R> operationDelegate = new ExtractField.Value<R>();
public R apply(R record) {
R operationRecord = operationDelegate.apply(record);
String op = String.valueOf(operationRecord.value());
Boolean isDeletedRecord = op.equalsIgnoreCase(Operation.DELETE.getValue())? true: false;
...
finalRecord = afterDelegate.apply(record);
if(isDeletedRecord){
addDeletedFlag(finalRecord);
}
}
private void addDeletedFlag(R finalRecord){
final SchemaBuilder builder = SchemaBuilder.struct();
builder.name(finalRecord.valueSchema().name());
for(Field f: finalRecord.valueSchema().fields()){
builder.field(f.name(),f.schema());
}
builder.field(deleteFlagName,Schema.BOOLEAN_SCHEMA).optional();
Schema newValueSchema = builder.build();
try{
java.lang.reflect.Field s = finalRecord.getClass().getSuperclass().getDeclaredField("valueSchema");
s.setAccessible(true);
s.set(finalRecord,newValueSchema);
}catch (Exception e){
e.printStackTrace();
}
Struct s = (Struct) finalRecord.value();
updateValueSchema(s,finalRecord.valueSchema());
updateValue(finalRecord.value(),true);
}
private void updateValueSchema(Object o,Schema newSchema) {
if(!(o instanceof Struct)){
return;
}
Struct value = (Struct) o;
try{
java.lang.reflect.Field s = value.getClass().getDeclaredField("schema");
s.setAccessible(true);
s.set(value,newSchema);
}catch (Exception e){
e.printStackTrace();
}
}
private void updateValue(Object o, Object newValue){
if(!(o instanceof Struct)){
return;
}
Struct value = (Struct) o;
try{
java.lang.reflect.Field s = value.getClass().getDeclaredField("values");
s.setAccessible(true);
Object[] newValueArray = ((Object[]) s.get(value)).clone();
List<Object> newValueList = new ArrayList<>(Arrays.asList(newValueArray));
newValueList.add(newValue);
s.set(value, newValueList.toArray());
}catch (Exception e){
e.printStackTrace();
}
}