首页 > 解决方案 > Siddhi 流式传输无模式数据

问题描述

我有一个来自 ActiveMQ 的源数据,我遇到的问题是这些数据没有固定的结构,因此,当我定义流时它会引发不兼容的数据类型错误,有没有办法通过某些条件来调节源流?

提前致谢。

/*
* Origin of data.
*/
@source(type='jms',
        @map(type='csv', delimiter=',', fail.on.unknown.attribute='false'),
        factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory',
        provider.url='tcp://127.0.0.1:61616',
        destination='simulatedData',
        connection.factory.type='queue',
        connection.factory.jndi.name='QueueConnectionFactory',
        transport.jms.SubscriptionDurable='true',
        transport.jms.DurableSubscriberClientID='wso2SPclient1')


define stream FileSourceProductionStream(type string, time long, studentId string, fileId string, totalAccesses float); /* totalAccesses : float Incompatible DataType*/
define stream TaskSourceProductionStream(type string, time long, studentId string, taskId string, deadline long); /*deadline: long Incompatible DataType*/

标签: streamwso2complex-event-processingsiddhiwso2sp

解决方案


在 siddhi 中,源流应该具有输入数据的模式。因此,我们无法将无模式数据接收到已定义的流中。

针对您的场景的一种可能解决方案是定义具有所有可能输入属性的流,并将输入预格式化为 siddhi-map 扩展支持的 JSON[1]、XML[2] 或 TEXT[3] 格式,使用属性名称和值。对于缺少的属性,json/xml/text 输入负载中不会有键或标签。

然后在源配置中使用 @map(fail.on.missing.attributes='false) 配置。

然后对于输入有效载荷中所有缺失的属性,NULL 将分配给输入流的相应属性。

[1] https://wso2-extensions.github.io/siddhi-map-json/api/4.0.20/

[2] https://wso2-extensions.github.io/siddhi-map-xml/api/4.0.12/

[3] https://wso2-extensions.github.io/siddhi-map-text/api/1.0.16/


推荐阅读