node-red - 如何在 Node-RED 中合并来自多个有效负载的数据?
问题描述
我想合并来自 3 个不同来源的数据(来自 HTTP msg.payload)。
但是,这些 HTTP 请求可能会被多次调用,因此可以多次接收来自同一源的数据。
[{"id":"7ed13b41.131b14","type":"join","z":"246eac57.42ec74","name":"","mode":"auto","build":"string","property":"payload","propertyType":"msg","key":"index","joiner":"","joinerType":"str","accumulate":false,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1370,"y":1160,"wires":[["d941ca6e.0e1aa8"]]},{"id":"d941ca6e.0e1aa8","type":"debug","z":"246eac57.42ec74","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":1490,"y":1120,"wires":[]},{"id":"b04a312.d6c40d","type":"function","z":"246eac57.42ec74","name":"part 1","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 0;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1120,"wires":[["7ed13b41.131b14"]]},{"id":"30cec12.e2fc13e","type":"function","z":"246eac57.42ec74","name":"part 2","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 1;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1160,"wires":[["7ed13b41.131b14"]]},{"id":"8902f2d5.ea688","type":"function","z":"246eac57.42ec74","name":"part 3","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 2;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1200,"wires":[["7ed13b41.131b14"]]},{"id":"814f25b6.dd3958","type":"http in","z":"246eac57.42ec74","name":"source 1","url":"/source1","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1120,"wires":[["b04a312.d6c40d"]]},{"id":"cab634ac.5d9df8","type":"http in","z":"246eac57.42ec74","name":"source 2","url":"/source 2","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1160,"wires":[["30cec12.e2fc13e"]]},{"id":"98f89b04.9b5bb8","type":"http in","z":"246eac57.42ec74","name":"source 3","url":"/source3","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1200,"wires":[["8902f2d5.ea688"]]}]
此流程中发生的情况是,当 Join Node 从源 1 接收到 3 条消息时,它认为msg.parts
完整。我想要实现的行为是,只有收到来自 3 个来源的数据时,流程才会继续。如果从同一个源接收数据,它只会覆盖以前的数据。
有没有办法在 Node-RED 中实现这一点?
解决方案
一种方法是为每个源使用不同msg.topic
的源,并设置您的join
节点以手动将所有输入组合到一个以主题为键的对象中:
[{"id":"7f9b1fbb.58ff","type":"join","z":"f9a2eec9.c2e26","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"","joinerType":"str","accumulate":true,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":770,"y":2040,"wires":[["42ac1f9e.eac62"]]},{"id":"42ac1f9e.eac62","type":"debug","z":"f9a2eec9.c2e26","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":930,"y":2040,"wires":[]},{"id":"37be742c.72a96c","type":"http in","z":"f9a2eec9.c2e26","name":"source 1","url":"/source1/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2020,"wires":[["8ca164f9.828898"]]},{"id":"51684d03.091ea4","type":"http in","z":"f9a2eec9.c2e26","name":"source 2","url":"/source2/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2060,"wires":[["2bb81dc1.351562"]]},{"id":"1a7ad806.dbb598","type":"http in","z":"f9a2eec9.c2e26","name":"source 3","url":"/source3/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2100,"wires":[["c7da5f3a.95b71"]]},{"id":"9a85c694.2a37a8","type":"http response","z":"f9a2eec9.c2e26","name":"","statusCode":"204","headers":{},"x":780,"y":2080,"wires":[]},{"id":"2a0579a6.612c66","type":"change","z":"f9a2eec9.c2e26","name":"set payload value","rules":[{"t":"set","p":"payload","pt":"msg","to":"req.params.value","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":570,"y":2060,"wires":[["9a85c694.2a37a8","7f9b1fbb.58ff"]]},{"id":"8ca164f9.828898","type":"change","z":"f9a2eec9.c2e26","name":"set topic 1","rules":[{"t":"set","p":"topic","pt":"msg","to":"source1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2020,"wires":[["2a0579a6.612c66"]]},{"id":"2bb81dc1.351562","type":"change","z":"f9a2eec9.c2e26","name":"set topic 2","rules":[{"t":"set","p":"topic","pt":"msg","to":"source2","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2060,"wires":[["2a0579a6.612c66"]]},{"id":"c7da5f3a.95b71","type":"change","z":"f9a2eec9.c2e26","name":"set topic 3","rules":[{"t":"set","p":"topic","pt":"msg","to":"source3","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2100,"wires":[["2a0579a6.612c66"]]}]
正如评论中提到的,您需要从所有以节点开头的流程中返回一些http in
内容......通常,我喜欢使用 http POST 端点(用于接收输入值)并返回状态代码 204(无内容)让调用者知道该值已被接受。由于您的原始流程使用 GET 节点,因此我参数化了 URL(例如/source1/:value
)并使用change
节点将值复制到有效负载。你的旅费可能会改变...
请注意,我将“消息部分数”设置为 3,这意味着您将不会从join
节点获得任何输出,直到它接收到 3 个不同的主题值。之后,每个输入的 msg 都会输出最新的组合 msg 对象(因为“每个后续消息”复选框)。通过调整这两个选项,您可以获得所需的任何行为。
推荐阅读
- c# - WPF 组合框似乎导致 InvalidCastException
- mysql - SQL日期间隔不适用于AND子句
- c# - 如果 int 值等于 x 替换为字符串
- android - Putting ImageView in front of other elements < 21API
- reactjs - How to correctly validate an masked-input using material-ui, formik, yup, and react-input-mask?
- java - 编译错误将实例方法声明为静态上下文(Java 6)
- jenkins - Jenkins: C:Program No such file or directory
- python-3.x - output.write(bytearray(image[y][x])) TypeError: write() 参数必须是 str,而不是 bytearray
- apache-spark - Apache Spark 无法反序列化集群上的“TopicPartition”
- iis - 在 IIS 下服务的 Vue.js 无法找到资产文件