首页 > 解决方案 > Cygnus:无法传递事件。事务打开时调用 close() - 您必须先提交或回滚

问题描述

我已经安装了 Fiware Orion (v 1.13.0)、Fiware Cygnus (2.8.0) 和 Kafka,以便通过 Cygnus 将数据从 Orion 发送到 Kafka。

我使用了 cygnus 文档中建议的 conf 文件

cygnus-ngsi.sources =http-source
cygnus-ngsi.sinks =kafka-sink
cygnus-ngsi.channels =kafka-channel

cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf


cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100

cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 192.168.1.142:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 192.168.1.142:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10

一切都在使用 Docker swarm

ID             NAME              MODE         REPLICAS   IMAGE                       PORTS
c195zwpxparu   cygnus_cygnus     replicated   1/1        fiware/cygnus-ngsi:latest   *:5050->5050/tcp, *:5055->5055/tcp, *:5080->5080/tcp
pzjvud4q4ibc   kafka_kafka       replicated   1/1        bitnami/kafka:2             *:9092->9092/tcp
bpyaz4jphuhh   kafka_zookeeper   replicated   1/1        bitnami/zookeeper:3         *:2181->2181/tcp
jk4po3ofs3bm   orion_mongodb     replicated   1/1        mongo:3.6.5
mlknk6j7y5kd   orion_orion       replicated   1/1        fiware/orion:1.13.0         *:1026->1026/tcp

这是我关于 cygnus 的 docker-compose.yml

version: "3"
services:

  cygnus:
    image: fiware/cygnus-ngsi
    ports:
      - "5050:5050"
      - "5055:5055"
      - "5080:5080"
    volumes:
      - ./conf/agent.conf:/opt/apache-flume/conf/agent.conf
    environment:
      - CYGNUS_SKIP_CONF_GENERATION=true
      - CYGNUS_MULTIAGENT=false
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s
  
networks:
  default:
    driver: overlay
    driver_opts:
      com.docker.network.driver.mtu: 1400

当通知从 Orion 到达 localhost:5050/notify 时,在 Cygnus 控制台上会出现此错误


cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6
7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at java.lang.Thread.run(Thread.java:748)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67
ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception  (begin() calle
d when transaction is OPEN!)

有什么建议吗?

标签: apache-kafkafiwarefiware-cygnus

解决方案


当前的 cygnus版本确实是 2.8.0。但是 Orion 1.13.0 已经有好几年了,不太可能与最新的 Cygnus 版本保持一致。

当前的 Orion版本是 3.0.0。尝试使用它。

可以在NGSI - v2 分步教程中找到最新 Orion 和最新 Cygnus 的基本工作示例


推荐阅读