首页 > 解决方案 > Mule 4:流:如何检查负载是否是 Mule 4 中的流?

问题描述

我是 Mule 4 的新手,并且对流有以下理解/怀疑。我将非常感谢您的帮助。

  1. mule 4 组件何时会生成流?例如,数据库选择可以返回对象数组和单个值。那么在这两种情况下,返回的有效负载都会是流式的吗?
  2. 如何检查 mule 4 组件返回的有效负载是否是流,如果是,如何分析流意味着在可重复文件存储流的情况下创建的文件在哪里以及已作为流消耗了多少有效负载?

例如,我创建了下面的 Mule 4 应用程序,它读取具有 100 万条记录的 CSV 文件并执行以下操作:

p('destination.dir')  ++ "Output_" ++ vars.counter ++ ".txt"

生成 100 个文件,每个文件包含 10 k 条记录。选中后,每个文件的大小为 913 KB。

预期:对于每个将处理 n 条记录,其中 n 条记录的大小为 512 KB,并在下一次迭代中处理 (batchsize-n) 实际:对于每个批次中处理的 10000 条记录。这是怎么发生的?

骡流代码:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
    xmlns:file="http://www.mulesoft.org/schema/mule/file"
    xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd 
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <file:config name="File_Config_1" doc:name="File Config" doc:id="565a1655-beba-4048-942f-ae68887e9b96" />
    <file:config name="File_Config" doc:name="File Config" doc:id="5182b61a-dddf-41c1-8d3c-6dbd895b5db7" />
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="f5bdb2ca-dc6f-4b6b-8407-8543bf7522e2" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>
    <flow name="dw-streamingFlow">
        <http:listener doc:name="Listener" doc:id="78635a2a-959d-41e9-a294-3cc7bb22d36f" config-ref="HTTP_Listener_config" path="/app/fileStream"/>
        <file:read path="C:\Users\bbazazx\Documents\TestFolder\InputDirectory\input.csv"
            config-ref="File_Config"
            outputMimeType="application/csv; streaming=true; header=true" />
        <foreach doc:name="For Each" doc:id="eacd49dc-c49f-437a-927e-976add7e57fc" batchSize="500" collection="payload">
            <ee:transform doc:name="Transform Message" doc:id="bec02dd1-4c15-47c0-81cf-a7d0bfd64b39">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload map(item,index) -> {
    "Country" : item."Country",
    "FoodItems" : item."Item Type"
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
            <file:write doc:name="Write" config-ref="File_Config_1" path='#["C:\\Users\\bbazazx\\Documents\\TestFolder\\OutputDirectory\\output" ++ vars.counter ++ ".json"]' />
            <logger level="INFO" doc:name="Logger" message="#[payload]" />
        </foreach>

</flow>

</mule>

标签: mule4

解决方案


这取决于每个连接器。Mule 4 中的大多数连接器都允许流式传输,并且许多连接器允许配置流式传输策略。有关详细信息,请参阅https://docs.mulesoft.com/mule-runtime/4.3/streaming-about

数据库连接器可能会返回一个包含在流中的数组。Mule 会理解它并将其作为数组透明地处理。

您可以在调试器中看到有效负载的类是一种流、可迭代或托管游标。这似乎是流媒体的迹象。

提到的 512 KB 是缓冲区的大小。对于文件存储的可重复流策略,上面的文档链接解释了它:

此策略最初使用 512 KB 的内存缓冲区大小。对于较大的流,该策略会在磁盘上创建一个临时文件来存储内容,而不会溢出内存。


推荐阅读