首页 > 解决方案 > 将多条消息发布到同一个 pub 子主题的问题

问题描述

我们正在尝试将消息发布到 google pub 子主题,我正在使用此 git存储库中的示例代码。

这里的问题是,只要从下面的代码中发布一条消息,发布到该主题的重复消息的数量就会呈指数级增长。不知道为什么我会面临这种行为,但无法弄清楚示例代码或已创建的发布子主题是否存在问题。有人可以帮助我了解这里发生了什么以及如何解决这个问题。

public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publisherExample(projectId, topicId);
      }

       public static void publisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "{\r\n" + 
                    "   \"errorCodeFormat\": \"NF-123-ABC000\"\r\n" + 
            "}";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
      }
    }
     }

以下是正在使用的订阅者代码

public static void subscribeAsyncExample(String projectId, String subscriptionId) throws TimeoutException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);
    
    

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

        System.out.println("You are in consumer listener");

        Subscriber subscriber = null;
       //        try {
          subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
          // Start the subscriber.
          subscriber.startAsync().awaitRunning();
          System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
          // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
          subscriber.awaitTerminated(30, TimeUnit.MINUTES);
      //        } catch (TimeoutException timeoutException) {
    //          // Shut down the subscriber after 30s. Stop receiving messages.
    //          subscriber.stopAsync();
    //        System.out.println("Subscriber state: {}"+ subscriber.state());

//  }
  }

标签: javagoogle-cloud-platformgoogle-cloud-pubsub

解决方案


鉴于这些消息具有不同的消息 ID,这表明重复发生在发布端。这可能有三个原因:

  1. 额外的,意想不到的电话被打到publish
  2. 在应用程序级别重试。
  3. 在 Pub/Sub 客户端库中重试。

您显示的代码并没有真正表明前两件事中的任何一个正在发生,但如果您的代码实际上更复杂,例如,在循环中调用发布,那么值得检查以确保这两个都不是是这样的。

对于最后一个,Pub/Sub 客户端库在内部重试由于可重试原因而失败的发布。最典型的原因之一是DEADLINE_EXCEEDED错误,当客户端没有足够快地从服务器接收到响应时会发生错误。这可能会导致重复,因为初始请求和重试请求最终都可能成功,您只能从第二个请求中获取消息 ID。

由于DEADLINE_EXCEEDED许多不同的原因,可能会发生错误。可能是您的 Internet 连接速度较慢,导致无法传输消息并足够快地接收响应。缓慢可能不是连接本身;如果您在通过网络执行许多其他操作的机器上运行,则可能是连接已饱和,因此无法及时处理请求和响应。如果您通过代理运行,那也可能有所帮助。

也可能是机器在 RAM 或 CPU 方面过载。如果需要进行大量的分页或者 CPU 被充分利用,那么需要客户端库处理的回调可能无法及时处理,从而导致DEADLINE_EXCEEDED错误和消息重试。

由于消息最终成功,您可以通过将参数更改为如何重试请求以增加初始超时来解决此问题:

import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;
...

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60);
      // This one is the important one to set.
      Duration initialRpcTimeout = Duration.ofSeconds(60);
      double rpcTimeoutMultiplier = 1.0;
      Duration maxRpcTimeout = Duration.ofSeconds(600);seconds
      Duration totalTimeout = Duration.ofSeconds(600);

      RetrySettings retrySettings =
          RetrySettings.newBuilder()
              .setInitialRetryDelay(initialRetryDelay)
              .setRetryDelayMultiplier(retryDelayMultiplier)
              .setMaxRetryDelay(maxRetryDelay)
              .setInitialRpcTimeout(initialRpcTimeout)
              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
              .setMaxRpcTimeout(maxRpcTimeout)
              .setTotalTimeout(totalTimeout)
              .build();
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

使用上述设置,初始 RPC 将在 60 秒内完成。这应该有望减少重复。如果您现在只是在试验,您可能需要在生产环境中调整这些设置,因为可能不需要这么长时间的超时。


推荐阅读