首页 > 解决方案 > Spring Dataflow Stream File Sink - 需要写入单独的文件

问题描述

我一直在使用Spring Cloud Dataflow。我编写了一个自定义云流处理器应用程序,它接收特定类型的 XML 文档,并将其拆分为更小的 XML 文档。

我期待下面的Cloud Stream定义写出多个文件。相反,当使用同一个文件进行测试时,它偶尔会将我的一些较小的 XML 写入一个文件,有时它会将它们写入两个文件(我认为这是由于我fixed-delay在下面的定义中的价值)。

我想知道如何让我的流将每个 XML 文档写入自己的文件。当我编写我专门使用的处理器时this.processor.output().send(message);,我@SendTo(Processor.OUTPUT)认为这样可以避免这个确切的问题。

与往常一样,非常感谢任何帮助。谢谢你。

数据流流定义:

    xmlSplit=fileIn: file --directory=/root/file_in --filename-regex=redactApplication.xml --fixed-delay=30 --markers-json=false --mode=contents | custom-xml-splitter | fileOut: file --directory=/root/file_out --name-expression="'test' + new java.text.SimpleDateFormat('yyyyMMddHHmmss').format(new java.util.Date()) + '.txt'"

我的自定义处理器的代码:

    @Component
    @EnableBinding(Processor.class)
    public class REDACTXMLSplitter {

        @Autowired
        private Processor processor;

        @SuppressWarnings("unchecked")
        @StreamListener(Processor.INPUT)
        public void parseForREDACTApplications(byte[] redcactXMLByteArray) {
            String redcactXML = new String(redcactXMLByteArray);
            InputSource doc = new InputSource( new StringReader( redcactXML ) );
            try
             {

                    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
                    DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();

                    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
                    factory.setNamespaceAware(true); // never forget this!

                    XPathFactory xfactory = XPathFactory.newInstance();
                    XPath xpath = xfactory.newXPath();

                    String xpathQuery = "//REDACT/Application";

                    xpath = xfactory.newXPath();
                    XPathExpression query = xpath.compile(xpathQuery);
                    NodeList productNodesFiltered = (NodeList) query.evaluate(doc, XPathConstants.NODESET);

                    for (int i=0; i<productNodesFiltered.getLength(); ++i)
                    {

                        Document suppXml = dBuilder.newDocument();

                        //we have to recreate the root node <products>
                        Element root = suppXml.createElement("REDACT"); 

                        Node productNode = productNodesFiltered.item(i);

                        //we append a product (cloned) to the new file
                        Node clonedNode = productNode.cloneNode(true);
                        suppXml.adoptNode(clonedNode); //We adopt the orphan :)
                        root.appendChild(clonedNode);

                        suppXml.appendChild(root);


                        //write out files
                        //At the end, we save the file XML on disk
    //                      TransformerFactory transformerFactory = TransformerFactory.newInstance();
    //                      Transformer transformer = transformerFactory.newTransformer();
    //                      transformer.setOutputProperty(OutputKeys.INDENT, "yes");
    //                      DOMSource source = new DOMSource(suppXml);
    //                      StreamResult result =  new StreamResult(new File("test_" + i + ".xml"));
    //                      transformer.transform(source, result);

                        TransformerFactory tf = TransformerFactory.newInstance();
                        Transformer transformer = tf.newTransformer();
                        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
                        StringWriter writer = new StringWriter();
                        transformer.transform(new DOMSource(suppXml), new StreamResult(writer));
                        String output = writer.getBuffer().toString().replaceAll("\n|\r", "");

                        System.out.println(output);

                        Message<String> message = new GenericMessage<>(output);
                        this.processor.output().send(message);
                    }

                }
             catch (XPathExpressionException | ParserConfigurationException | TransformerException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

标签: javaspringspring-cloud-streamspring-cloud-dataflow

解决方案


也许文件是突然到达的,并且因为接收器步骤正在发送到一个名称为以秒为单位的日期时间的文件?如果它们在同一秒内到达,多个 xml 将最终出现在同一个文件中?


推荐阅读