首页 > 解决方案 > 延迟确认 GCloud Pub/Sub 消息

问题描述

我正在尝试实现一个类似于 AWS 或 Azure 队列的 PubSub 客户端,但是,我遇到了 gcloud cpp sdk 的问题。

更新:删除了不必要的细节。

首先,提供的示例不能开箱即用 - 我必须在之前休眠session.cancel(),否则消息不会被确认。是否有可靠的方法等到 ack() 操作完成并检查其状态?至少我想确保服务器收到我的请求。

此外,c++ API 似乎只提供了一个不适合我的用例的异步方法。

我需要实现以下接口,该接口通过依赖注入插入到更大的系统中。该系统在其他云上的生产中工作,因此我无法更改架构。只需要实现接口。

template<typename TItem>
class Queue{
public:
    /*!
    * Dequeues message from the queue 
    * Returns true on success
    */
    virtual bool Dequeue( TItem & item) = 0;

    /*!
    * Discards(deletes) the item with from the cloud queue.
    */
    virtual void Discard(const TReceipt & receipt) = 0;
};

队列的实际实现将提供一个序列化器,该序列化器将 TItem 序列化为 JSON 并返回。

AWS 和 Azure SDK 为每个出队的消息提供了一个收据,以便我以后可以丢弃它。pubsub SDK 的收据是绑定到会话的 AckHandler 对象。

一个明显错误的解决方案是保持会话打开并在 lambda 中等待另一个 condition_variable,直到下一次调用 Dequeue 方法。但是,这看起来像是一个快速而肮脏的解决方案。使用 Pub/Sub 实现此功能的正确方法是什么?

标签: c++11google-cloud-platformgoogle-cloud-pubsub

解决方案


是否有可靠的方法等到 ack() 操作完成并检查其状态?

不是真的,因为ack()操作是尽力而为的。即使您等待ack()到达服务器,也不能保证消息不会被重新发送。是的,服务可能会在成功后重新发送消息ack()

请注意,库会自动延长您的消息的租约,以避免在您明确地ack()或之前重新发送nack(),因此延迟ack()不会影响正确性,除非您在之后不久关闭应用程序ack()

AWS 和 Azure SDK 为每个出队的消息提供了一个收据,以便我以后可以丢弃它,但我不知道如何使用 pub/sub 来做到这一点。

您可以调用nack()丢弃AckHandler一条消息,这意味着该消息将被重新发送到另一个实例。这就是你所说的“丢弃”吗?

使用 Pub/Sub 实现此功能的正确方法是什么?

嗯,我不确定我是否遵循。我可以猜测 and 的语义Dequeue()Discard()如果我猜错了,请道歉。无论如何,尚不清楚这是否真的适用于任何类型?喜欢TReceipt == int

需要注意的是这里有很多猜测,您可以执行以下操作:

class PubsubBuffer {
 private:
  std::mutex mu_;
  std::dequeue<std::pair<pubsub::Message, AckHandlerWrapper>> queue_;

 public:
  virtual bool Dequeue(pubsub::Message& item, AckHandlerWrapper& receipt) {
    std::unique_lock<std::mutex> lk(mu_);
    if (queue_.empty()) return false;
    auto& f = queue_.front();
    item = std::move(f.first);
    receipt = std::move(.second);
    queue_.pop_front();
    return true;
  }
  virtual void Discard(AckHandlerWrapper const& receipt) {
    std::move(receipt.ack_handler).ack();
  }
  void Push(pubsub::Message m, pubsub::AckHandler h) {
    std::unique_lock<std::mutex> lk(mu_);
    queue_.push_back(
      std::make_pair(std::move(m), AckHandlerWrapper(std::move(h)));
  }
};


std::shared_ptr<PubsubBuffer> F(pubsub::Subscriber s) {
  auto buffer = std::make_shared<PubsubBuffer>();
  auto handler = [buffer](pubsub::Message m, pubsbu::AckHandler h) {
    buffer.Push(std::move(m), std::move(h));
  };
  return buffer;
}

推荐阅读