将多条消息发布到同一个 pub 子主题的问题


我们正在尝试将消息发布到 google pub 子主题,我正在使用此 git存储库中的示例代码。


public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publisherExample(projectId, topicId);

       public static void publisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "{\r\n" + 
                    "   \"errorCodeFormat\": \"NF-123-ABC000\"\r\n" + 
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.awaitTermination(1, TimeUnit.MINUTES);


public static void subscribeAsyncExample(String projectId, String subscriptionId) throws TimeoutException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());

        System.out.println("You are in consumer listener");

        Subscriber subscriber = null;
       //        try {
          subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
          // Start the subscriber.
          System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
          // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
          subscriber.awaitTerminated(30, TimeUnit.MINUTES);
      //        } catch (TimeoutException timeoutException) {
    //          // Shut down the subscriber after 30s. Stop receiving messages.
    //          subscriber.stopAsync();
    //        System.out.println("Subscriber state: {}"+ subscriber.state());

//  }

鉴于这些消息具有不同的消息 ID,这表明重复发生在发布端。这可能有三个原因:

  1. 额外的,意想不到的电话被打到publish
  2. 在应用程序级别重试。
  3. 在 Pub/Sub 客户端库中重试。


对于最后一个,Pub/Sub 客户端库在内部重试由于可重试原因而失败的发布。最典型的原因之一是DEADLINE_EXCEEDED错误,当客户端没有足够快地从服务器接收到响应时会发生错误。这可能会导致重复,因为初始请求和重试请求最终都可能成功,您只能从第二个请求中获取消息 ID。

由于DEADLINE_EXCEEDED许多不同的原因,可能会发生错误。可能是您的 Internet 连接速度较慢,导致无法传输消息并足够快地接收响应。缓慢可能不是连接本身;如果您在通过网络执行许多其他操作的机器上运行,则可能是连接已饱和,因此无法及时处理请求和响应。如果您通过代理运行,那也可能有所帮助。

也可能是机器在 RAM 或 CPU 方面过载。如果需要进行大量的分页或者 CPU 被充分利用,那么需要客户端库处理的回调可能无法及时处理,从而导致DEADLINE_EXCEEDED错误和消息重试。


import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60);
      // This one is the important one to set.
      Duration initialRpcTimeout = Duration.ofSeconds(60);
      double rpcTimeoutMultiplier = 1.0;
      Duration maxRpcTimeout = Duration.ofSeconds(600);seconds
      Duration totalTimeout = Duration.ofSeconds(600);

      RetrySettings retrySettings =
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

使用上述设置,初始 RPC 将在 60 秒内完成。这应该有望减少重复。如果您现在只是在试验,您可能需要在生产环境中调整这些设置,因为可能不需要这么长时间的超时。
