Search code examples
wso2ei

WSO2 Enterprise Integrator 6.3.0: how to merge mutliple files to one?


I'm trying to create an integration scenario with WSO EI 6.3.0 and have the following scenario and do not get it working:

  • I have a set of multiple CSV files holding related entities (let's say just two for the beginning, customers and orders) in folder "IN" written by system "A"
  • At the end I want to create one XML having the CSV contents nested (e.g. having orders as children for the customers) in folder "OUT" to be read by system "B"

I created a proxy reading the CSV files and pushing them to the inSequence. . What I tried:

  • creating a clone mediator to send the file to some transformation sequences (one per CSV file type). Using the filter and smooks afterwards created a XML payload
  • collect the messages in an aggregator and finally write them to file

I'm reaching the aggregator step but I'm not stepping over it.

What am I doing wrong? Or is this use case potentially not realizable with WSO2 EI?

Find below my code. Thank you in advance for your support guys.

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="FileReaderProxy" startOnLoad="true" transports="vfs" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="AggregatedResponses" scope="default">
                <metasyst/>
            </property>
            <property description="Get File Name" expression="$trp:FILE_NAME" name="fileName" scope="default" type="STRING"/>
            <log description="Enter In Sequence" level="full">
                <property name="Step" value="Enter In Sequence"/>
                <property expression="get-property('fileName')" name="Filename : "/>
            </log>
            <clone continueParent="true" id="csvFileSet">
                <target>
                    <sequence>
                        <property description="Get File Name" expression="$trp:FILE_NAME" name="fileName" scope="default" type="STRING"/>
                        <log description="Enter Order Clone Branch" level="full">
                            <property name="Step" value="Enter Order Clone Branch"/>
                            <property expression="get-property('fileName')" name="Reading File : "/>
                        </log>
                        <filter xpath="get-property('fileName')='auftraege.csv'">
                            <then>
                                <smooks config-key="conf:myresources/order-smooks-config.xml">
                                    <input type="text"/>
                                    <output type="xml"/>
                                </smooks>
                                <log description="Exit Order Clone Branch" level="full">
                                    <property name="Step" value="Exit Order Clone Branch"/>
                                </log>
                            </then>
                            <else>
                                <log description="Drop Order Clone Branch" level="full">
                                    <property name="Step" value="Drop Order Clone Branch"/>
                                </log>
                                <drop/>
                            </else>
                        </filter>
                    </sequence>
                </target>
                <target>
                    <sequence>
                        <property description="Get File Name" expression="$trp:FILE_NAME" name="fileName" scope="default" type="STRING"/>
                        <log description="Enter Customer Clone Branch" level="full">
                            <property name="Step" value="Enter Customer Clone Branch"/>
                            <property expression="get-property('fileName')" name="Reading File : "/>
                        </log>
                        <filter xpath="get-property('fileName')='kunden.csv'">
                            <then>
                                <smooks config-key="conf:myresources/customer-smooks-config.xml">
                                    <input type="text"/>
                                    <output type="xml"/>
                                </smooks>
                                <log description="Exit Customer Clone Branch" level="full">
                                    <property name="Step" value="Exit Customer Clone Branch"/>
                                </log>
                            </then>
                            <else>
                                <log description="Exit Customer Clone Branch" level="full">
                                    <property name="Step" value="Exit Customer Clone Branch"/>
                                </log>
                                <drop/>
                            </else>
                        </filter>
                    </sequence>
                </target>
            </clone>
            <log description="Enter aggregation" level="full">
                <property name="Step" value="Enter aggregation"/>
            </log>
            <property name="AggregatedResponses" scope="default">
                <metasyst/>
            </property>
            <aggregate id="csvFileSet">
                <completeCondition>
                    <messageCount max="-1" min="2"/>
                </completeCondition>
                <onComplete enclosingElementProperty="AggregatedResponses" expression="$body/*[1]" xmlns:ns="http://org.apache.synapse/xsd">
                    <log description="Start Output File Writing" level="full">
                        <property name="Step" value="Start Output File Writing"/>
                    </log>
                    <property description="OUT_ONLY" name="OUT_ONLY" scope="default" type="STRING" value="true"/>
                    <send>
                        <endpoint>
                            <address uri="vfs:file:///D:/Programme/WSO2-EI/Samples/files/4-out"/>
                        </endpoint>
                    </send>
                </onComplete>
            </aggregate>
        </inSequence>
        <outSequence/>
        <faultSequence>
            <drop/>
        </faultSequence>
    </target>
    <parameter name="transport.PollInterval">1</parameter>
    <parameter name="transport.vfs.FileURI">file:///D:/Programme/WSO2-EI/Samples/files/1-in</parameter>
    <parameter name="transport.vfs.ContentType">text/plain</parameter>
    <parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
    <parameter name="transport.vfs.MoveAfterFailure">file:///D:/Programme/WSO2-EI/Samples/files/3-error</parameter>
    <parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
    <parameter name="transport.vfs.FileNamePattern">.*.csv</parameter>
    <parameter name="transport.vfs.MoveAfterProcess">file:///D:/Programme/WSO2-EI/Samples/files/2-success</parameter>
</proxy>

Solution

  • Using VFS you can only process one file at a time, every file that is picked up from the location it is listening on starts a new thread. Basically it is a new message to your proxy.

    Instead you could use the file connector to solve this. Depending on your requirements and how you identify your set of files you could still use vfs to trigger the process and then file connector to collect the other files. Or have a schedules task trigger your service that then uses the file connector to look for a set of files at a given interval.

    For a more elaborate answer I would have to know a bit more about the set of files you are trying to process, file name patterns, is it a set number of files every time etc.

    Hope this helps