mule4 - Mule 4:流:如何检查负载是否是 Mule 4 中的流?
问题描述
我是 Mule 4 的新手,并且对流有以下理解/怀疑。我将非常感谢您的帮助。
- mule 4 组件何时会生成流?例如,数据库选择可以返回对象数组和单个值。那么在这两种情况下,返回的有效负载都会是流式的吗?
- 如何检查 mule 4 组件返回的有效负载是否是流,如果是,如何分析流意味着在可重复文件存储流的情况下创建的文件在哪里以及已作为流消耗了多少有效负载?
例如,我创建了下面的 Mule 4 应用程序,它读取具有 100 万条记录的 CSV 文件并执行以下操作:
- 读取 CSV 文件 [ Streaming Strategy : Repeatable file store stream, in memory size: 512 KB ]
- 使用批量大小为 10k 的每个循环
- 在每个内部,一个将 csv 行转换为 Json 的转换消息和一个文件写入操作,该操作将根据 dw 代码创建具有名称的文件:
p('destination.dir') ++ "Output_" ++ vars.counter ++ ".txt"
生成 100 个文件,每个文件包含 10 k 条记录。选中后,每个文件的大小为 913 KB。
预期:对于每个将处理 n 条记录,其中 n 条记录的大小为 512 KB,并在下一次迭代中处理 (batchsize-n) 实际:对于每个批次中处理的 10000 条记录。这是怎么发生的?
骡流代码:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<file:config name="File_Config_1" doc:name="File Config" doc:id="565a1655-beba-4048-942f-ae68887e9b96" />
<file:config name="File_Config" doc:name="File Config" doc:id="5182b61a-dddf-41c1-8d3c-6dbd895b5db7" />
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="f5bdb2ca-dc6f-4b6b-8407-8543bf7522e2" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="dw-streamingFlow">
<http:listener doc:name="Listener" doc:id="78635a2a-959d-41e9-a294-3cc7bb22d36f" config-ref="HTTP_Listener_config" path="/app/fileStream"/>
<file:read path="C:\Users\bbazazx\Documents\TestFolder\InputDirectory\input.csv"
config-ref="File_Config"
outputMimeType="application/csv; streaming=true; header=true" />
<foreach doc:name="For Each" doc:id="eacd49dc-c49f-437a-927e-976add7e57fc" batchSize="500" collection="payload">
<ee:transform doc:name="Transform Message" doc:id="bec02dd1-4c15-47c0-81cf-a7d0bfd64b39">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload map(item,index) -> {
"Country" : item."Country",
"FoodItems" : item."Item Type"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write" config-ref="File_Config_1" path='#["C:\\Users\\bbazazx\\Documents\\TestFolder\\OutputDirectory\\output" ++ vars.counter ++ ".json"]' />
<logger level="INFO" doc:name="Logger" message="#[payload]" />
</foreach>
</flow>
</mule>
解决方案
这取决于每个连接器。Mule 4 中的大多数连接器都允许流式传输,并且许多连接器允许配置流式传输策略。有关详细信息,请参阅https://docs.mulesoft.com/mule-runtime/4.3/streaming-about。
数据库连接器可能会返回一个包含在流中的数组。Mule 会理解它并将其作为数组透明地处理。
您可以在调试器中看到有效负载的类是一种流、可迭代或托管游标。这似乎是流媒体的迹象。
提到的 512 KB 是缓冲区的大小。对于文件存储的可重复流策略,上面的文档链接解释了它:
此策略最初使用 512 KB 的内存缓冲区大小。对于较大的流,该策略会在磁盘上创建一个临时文件来存储内容,而不会溢出内存。
推荐阅读
- modbus - 使用 pymodbus 从 Epever uPower 充电器/逆变器读取 modbus 寄存器
- azure-devops - 如何删除 Azure Devops 中损坏的 Wiki 条目?
- r - 如何为我的整个表集向前滚动最大值?
- selenium - Selenium Driver 是大型 json 数据下载后 ajax 调用后面的一页
- python - 在 2D 中绘制对象的 3D 边界框(处理中)
- cluster-analysis - RNAseq 生物复制在 PCA 图中不聚集
- powershell - PowerShell 单线查找 AD 中的非活动计算机
- php - 如何在公共磁盘上存储\Illuminate\Http\UploadedFile?
- javascript - 具有不可编辑子项的 contentEditable div
- c# - c# 套接字服务器在 Ubuntu(linux) 中无法正常工作