Search code examples
schematalend

How to handle file inputs with changing schemas in Talend


Questions: How do I continue to process files that differ substantially from a base schema and that trigger tSchemaComplianceCheck errors?

Background

Suppose I have a folder with Customer xls files called file1,file2,....file1000. Assume I have imported the file schema into Talend repository and called it 6Columns and I have the talend job configured to iterate through each of the files and process them

1-tFileInput ->2-tSchemaCompliance-6Columns -> 3-tMap ->4-FurtherProcessing
  1. Read each excel file
  2. Compare it to the schema 6Columns
  3. Format the output (rename columns)
  4. Take the collection of Customer data and process it more

While processing I notice that the schema compliance is generating errors (errorCode 16) which points to a number of files (200) with a different schema 13Columns but there isn't a way to identify the files in advance to filter then into a subjob

How do I amend my processing to correctly integrate the files with 13Columnsschema into the process (whats the recommended way of handling) and designing incase other schema changes occur

1-tFileInput ->2-tSchemaCompliance-6Columns -> 3-tMap ->4-FurtherProcessing
                   |
                   |Reject Flow (ErrorCode 16)

                   |Schema-13Columns 
                   |
                   |-> ??

Current Thinking When ErrorCode 16 detected

Option 1 Parallel. Take the file path for the current file and process it against 13Columns using a new FileInput before merging the 2 flows back into 1

Option 2 Serial. Collect the list of files that triggered the error and process them after I've finished with the compliance files?


Solution

  • You could try something like below :

    enter image description here

    tFileList - Read your input repository tFileInput "schema6" - tSchemaComplianceCheck : read files as 6-columns schema tMap_1 : further processing

    In the reject part :

    tMap after reject link : add a new column containing the filepath that has been rejected enter image description here

    tFlowToIterate : used to get an iterate link, acceptable input for tFileInputDelimited that follows. tFileInput : read data as 13-columns schema. Following components are the same as in part 1.

    After that, you can push your data to tHashOutput, in order to read them further in another subjob.