首页 > 解决方案 > 集群事件总线消费者未收到消息

问题描述

我有两个服务(HttpServer 和一个消费者)在两个不同的服务器上运行。我正在使用 Hazelcast 进行聚类。

这是我的代码:

public class SampleRestService extends StartVerticle {


@Override
public void start() {
   Router router = Router.router(vertx);
   router.route().handler(BodyHandler.create());
   router.post("/subscribe").handler(this::handleSubscription);
   vertx.createHttpServer().requestHandler(router::accept).listen(9001);
}

public void handleSubscription(RoutingContext routingContext) {
System.out.println("sending to event bus");
vertx.eventBus().send("myserviceadd", routingContext.request().path(), ar -> {
    if (ar.succeeded()) {
        //req.response().setStatusCode(200).write(result.result().body()).end();
        System.out.println("received response from event bus");
        routingContext.response().setStatusCode(200).end("myhttp-response" + ar.result().body() + " ** " + routingContext.request().path());
    } else if(ar.failed()) {
        System.out.println(" response from event bus is failed");
        ar.cause().printStackTrace();
        routingContext.response().setStatusCode(500).end("failed to subscribeS");
     }
 });
 }

}

消费者守则:

public class SampleRestComsumer extends SampleConsumerParent {


 @Override
  public void start() {


  vertx.eventBus().consumer("myserviceadd", message -> {

        System.out.println("Recevied message: " + message.body());

        message.reply(new JsonObject().put("responseCode", "OK").put("message", "This is your response to your event"));

    });
 }

}

这是我的 cluster.xml

<?xml version="1.0" encoding="UTF-8"?>
     <hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config 
     hazelcast-config-3.2.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <properties>
        <property name="hazelcast.wait.seconds.before.join">0</property>
        <property name="hazelcast.logging.type">jdk</property>
    </properties>

    <group>
        <name>dev-test</name>
        <password>dev-pass</password>
    </group>
    <management-center enabled="false">http://localhost:8080/mancenter</management-center>
    <network>
        <port auto-increment="false" port-count="10000">5701</port>
        <outbound-ports>
            <!--
            Allowed port range when connecting to other nodes.
            0 or * means use system provided port.
            -->
            <ports>0</ports>
        </outbound-ports>
        <join>
            <multicast enabled="false">
            <!--<multicast-group>224.2.2.3</multicast-group>-->
            <!--<multicast-port>54327</multicast-port>-->
            </multicast>
            <tcp-ip enabled="true">
                <!-- <interface>127.0.0.1</interface> -->
                <interface>10.27.92.45</interface>
                <interface>10.27.92.47</interface>
            </tcp-ip>
            <aws enabled="false">

            </aws>
        </join>
        <interfaces enabled="true">
            <interface>10.27.92.*</interface>
        </interfaces>        
    </network>
    <partition-group enabled="false"/>
    <executor-service name="default">
        <pool-size>16</pool-size>
        <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
        <queue-capacity>0</queue-capacity>
    </executor-service>
    <map name="__vertx.subs">

        <!--
            Number of backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. 0 means no backup.
        -->
        <backup-count>1</backup-count>

        <time-to-live-seconds>0</time-to-live-seconds>
        <max-idle-seconds>0</max-idle-seconds>
        <!--
            Valid values are:
            NONE (no eviction),
            LRU (Least Recently Used),
            LFU (Least Frequently Used).
            NONE is the default.
        -->
        <eviction-policy>NONE</eviction-policy>
        <!--
            Maximum size of the map. When max size is reached,
            map is evicted based on the policy defined.
            Any integer between 0 and Integer.MAX_VALUE. 0 means
            Integer.MAX_VALUE. Default is 0.
        -->
        <max-size policy="PER_NODE">0</max-size>
        <!--
            When max. size is reached, specified percentage of
            the map will be evicted. Any integer between 0 and 100.
            If 25 is set for example, 25% of the entries will
            get evicted.
        -->
        <eviction-percentage>25</eviction-percentage>
        <merge-policy>
      com.hazelcast.map.merge.LatestUpdateMapMergePolicy
</merge-policy>
    </map>

    <!-- Used internally in Vert.x to implement async locks -->
    <semaphore name="__vertx.*">
        <initial-permits>1</initial-permits>
    </semaphore>

</hazelcast>

当我同时运行这两项服务时。它们被添加到集群中,两个服务也都启动了,但我在服务器上遇到错误

 response from event bus is failed
 (TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 
  7dfec6d2-3eaf-4006-90b1-6c2eaddb28bd
        at io.vertx.core.eventbus.impl.HandlerRegistration.sendAsyncResultFailure(HandlerRegistration.java:118)
    at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$new$0(HandlerRegistration.java:65)

请让我知道我做错了什么。

标签: hazelcastvert.x

解决方案


推荐阅读