首页 > 解决方案 > 尝试在 Java 中对 Google pubsub 执行同步拉取时出错

问题描述

我正在尝试使用 Google pubsub 订阅在 Java 中执行同步拉取,但是当我使用记录的过程(https://cloud.google.com/pubsub/docs/pull#synchronous_pull)时出现错误。

我已经能够在同一台服务器上的 Python 中执行相同的功能,并返回正确的消息。所以我知道盒子没问题。看起来这是我的 Java 进程的问题。

  public void readBucket() {
    try {
      List<ReceivedMessage> myMessage = createSubscriberWithSyncPull("my-project", "my-subscription", 1);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  static List<ReceivedMessage> createSubscriberWithSyncPull(
      String projectId, String subscriptionId, int numOfMessages) throws Exception {
    // [START pubsub_subscriber_sync_pull]
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setReturnImmediately(true) // return immediately if messages are not available
              .setSubscription(subscriptionName)
              .build();

      // use pullCallable().futureCall to asynchronously perform this operation
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // handle received message
        // ...
        ackIds.add(message.getAckId());
      }
      // acknowledge received messages
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();
      // use acknowledgeCallable().futureCall to asynchronously perform this operation
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      return pullResponse.getReceivedMessagesList();
    }
    // [END pubsub_subscriber_sync_pull]
  }

我很确定这是导致问题的行:

SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)

我收到以下错误消息:

2019-09-04 15:01:00,727 - ERROR [MapReduceRunner-phase-1:i.c.c.i.a.r.ProgramControllerServiceAdapter$1@97] - MapReduce Program 'phase-1' failed.
java.lang.Exception: com.google.pubsub.v1.StreamingPullRequest.emptyIntList()Lcom/google/protobuf/Internal$IntList;
        at io.cdap.cdap.internal.app.runtime.AbstractContext.lambda$initializeProgram$1(AbstractContext.java:645) ~[na:na]
        at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:600) ~[na:na]
        at io.cdap.cdap.internal.app.runtime.AbstractContext.initializeProgram(AbstractContext.java:637) ~[na:na]
        at io.cdap.cdap.internal.app.runtime.batch.MapReduceRuntimeService.beforeSubmit(MapReduceRuntimeService.java:547) ~[na:na]
        at io.cdap.cdap.internal.app.runtime.batch.MapReduceRuntimeService.startUp(MapReduceRuntimeService.java:226) ~[na:na]
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:47) ~[com.google.guava.guava-13.0.1.jar:na]
        at io.cdap.cdap.internal.app.runtime.batch.MapReduceRuntimeService$2$1.run(MapReduceRuntimeService.java:450) [na:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
Caused by: java.lang.NoSuchMethodError: com.google.pubsub.v1.StreamingPullRequest.emptyIntList()Lcom/google/protobuf/Internal$IntList;
        at com.google.pubsub.v1.StreamingPullRequest.<init>(StreamingPullRequest.java:30) ~[na:na]
        at com.google.pubsub.v1.StreamingPullRequest.<clinit>(StreamingPullRequest.java:1594) ~[na:na]
        at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.<clinit>(GrpcSubscriberStub.java:149) ~[na:na]

欢迎任何帮助!

标签: javagoogle-cloud-pubsub

解决方案


错误消息与其他问题类似:方法 emptyIntList() is undefined

如果是这种情况,protobuf-java如果您使用 maven,包括依赖项将修复它。


推荐阅读