Search code examples
jdbcapache-kafkaapache-kafka-connectconfluent-platformdebezium

How to modify/update to the data before sending it to downstream


I have a topic which has data in the format

{
 before: {...},
 after: {...},
 source: {...},
 op: 'u'
}

The data was produced by Debezium. I want to send the data to SQL Server db table, so I selected JDBC Sink Connector. I need to process the data before sending it to downstream.

Logic that needs to be applied:

  1. if op = 'u' or op = 'c' or op = 'r' // update or insert or snapshot

    select all the fields present in 'after' and perform upsert to downstream.

  2. if op = 'd' // delete

    select all the fields present in 'before' + add a field IsActive=false and perform upsert to downstream.

How can I achieve this?


Solution

  • I was able do achieve this using custom transform in sink jdbc connector. I extracted the after field and op field and applied the logic. There is no direct way to update the record i.e. there is no method to setSchema and setValue. So i have used reflection to update schema and value.

    The below code snippets:

    private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
        private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
        private final ExtractField<R> operationDelegate = new ExtractField.Value<R>(); 
    
    public R apply(R record) {
            R operationRecord = operationDelegate.apply(record);
            String op = String.valueOf(operationRecord.value());
            Boolean isDeletedRecord = op.equalsIgnoreCase(Operation.DELETE.getValue())? true: false;
           ...
           finalRecord = afterDelegate.apply(record);
           if(isDeletedRecord){
                addDeletedFlag(finalRecord);
            }
    } 
    private void addDeletedFlag(R finalRecord){
            final SchemaBuilder builder = SchemaBuilder.struct();
            builder.name(finalRecord.valueSchema().name());
            for(Field f: finalRecord.valueSchema().fields()){
                builder.field(f.name(),f.schema());
            }
            builder.field(deleteFlagName,Schema.BOOLEAN_SCHEMA).optional();
            Schema newValueSchema = builder.build();
            try{
                java.lang.reflect.Field s = finalRecord.getClass().getSuperclass().getDeclaredField("valueSchema");
                s.setAccessible(true);
                s.set(finalRecord,newValueSchema);
            }catch (Exception e){
                e.printStackTrace();
            }
    
            Struct s = (Struct) finalRecord.value();
            updateValueSchema(s,finalRecord.valueSchema());
            updateValue(finalRecord.value(),true);
        }
    private void updateValueSchema(Object o,Schema newSchema) {
            if(!(o instanceof Struct)){
                return;
            }
            Struct value = (Struct) o;
            try{
                java.lang.reflect.Field s = value.getClass().getDeclaredField("schema");
                s.setAccessible(true);
                s.set(value,newSchema);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        private void updateValue(Object o, Object newValue){
            if(!(o instanceof Struct)){
                return;
            }
            Struct value = (Struct) o;
    
            try{
                java.lang.reflect.Field s = value.getClass().getDeclaredField("values");
                s.setAccessible(true);
                Object[] newValueArray = ((Object[]) s.get(value)).clone();
                List<Object> newValueList = new ArrayList<>(Arrays.asList(newValueArray));
                newValueList.add(newValue);
                s.set(value, newValueList.toArray());
            }catch (Exception e){
                e.printStackTrace();
            }
        }