Search code examples
javaspringspring-cloud-streamspring-cloud-dataflow

Spring Dataflow Stream File Sink - Need to write to separate files


I have been working with Spring Cloud Dataflow. I have written a custom Cloud Stream Processor application that takes in a specific type of an XML document, and splits it into smaller XML documents.

I was expecting the Cloud Stream definition below to write out multiple files. Instead when testing with the same file it sporadically writes some of my smaller XMLs to one file, and sometimes it writes them out to two (I am thinking this is due to my fixed-delay value in the definition below).

I was wondering how I can get my stream to write each XML document to its own file. When I wrote the processor I specifically used this.processor.output().send(message); instead of @SendTo(Processor.OUTPUT), thinking that would avoid this exact problem.

As always, any help is greatly appreciated. Thank you.

Dataflow Stream Definition:

    xmlSplit=fileIn: file --directory=/root/file_in --filename-regex=redactApplication.xml --fixed-delay=30 --markers-json=false --mode=contents | custom-xml-splitter | fileOut: file --directory=/root/file_out --name-expression="'test' + new java.text.SimpleDateFormat('yyyyMMddHHmmss').format(new java.util.Date()) + '.txt'"

Code for my custom processor:

    @Component
    @EnableBinding(Processor.class)
    public class REDACTXMLSplitter {

        @Autowired
        private Processor processor;

        @SuppressWarnings("unchecked")
        @StreamListener(Processor.INPUT)
        public void parseForREDACTApplications(byte[] redcactXMLByteArray) {
            String redcactXML = new String(redcactXMLByteArray);
            InputSource doc = new InputSource( new StringReader( redcactXML ) );
            try
             {

                    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
                    DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();

                    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
                    factory.setNamespaceAware(true); // never forget this!

                    XPathFactory xfactory = XPathFactory.newInstance();
                    XPath xpath = xfactory.newXPath();

                    String xpathQuery = "//REDACT/Application";

                    xpath = xfactory.newXPath();
                    XPathExpression query = xpath.compile(xpathQuery);
                    NodeList productNodesFiltered = (NodeList) query.evaluate(doc, XPathConstants.NODESET);

                    for (int i=0; i<productNodesFiltered.getLength(); ++i)
                    {

                        Document suppXml = dBuilder.newDocument();

                        //we have to recreate the root node <products>
                        Element root = suppXml.createElement("REDACT"); 

                        Node productNode = productNodesFiltered.item(i);

                        //we append a product (cloned) to the new file
                        Node clonedNode = productNode.cloneNode(true);
                        suppXml.adoptNode(clonedNode); //We adopt the orphan :)
                        root.appendChild(clonedNode);

                        suppXml.appendChild(root);


                        //write out files
                        //At the end, we save the file XML on disk
    //                      TransformerFactory transformerFactory = TransformerFactory.newInstance();
    //                      Transformer transformer = transformerFactory.newTransformer();
    //                      transformer.setOutputProperty(OutputKeys.INDENT, "yes");
    //                      DOMSource source = new DOMSource(suppXml);
    //                      StreamResult result =  new StreamResult(new File("test_" + i + ".xml"));
    //                      transformer.transform(source, result);

                        TransformerFactory tf = TransformerFactory.newInstance();
                        Transformer transformer = tf.newTransformer();
                        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
                        StringWriter writer = new StringWriter();
                        transformer.transform(new DOMSource(suppXml), new StreamResult(writer));
                        String output = writer.getBuffer().toString().replaceAll("\n|\r", "");

                        System.out.println(output);

                        Message<String> message = new GenericMessage<>(output);
                        this.processor.output().send(message);
                    }

                }
             catch (XPathExpressionException | ParserConfigurationException | TransformerException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

Solution

  • Maybe the files are arriving in bursts and because the sink step is sending then to a file with the name being the date time in seconds? More than one xml will end up in the same file if they arrive during the same second?