c++11 - 延迟确认 GCloud Pub/Sub 消息
问题描述
我正在尝试实现一个类似于 AWS 或 Azure 队列的 PubSub 客户端,但是,我遇到了 gcloud cpp sdk 的问题。
更新:删除了不必要的细节。
首先,提供的示例不能开箱即用 - 我必须在之前休眠session.cancel()
,否则消息不会被确认。是否有可靠的方法等到 ack() 操作完成并检查其状态?至少我想确保服务器收到我的请求。
此外,c++ API 似乎只提供了一个不适合我的用例的异步方法。
我需要实现以下接口,该接口通过依赖注入插入到更大的系统中。该系统在其他云上的生产中工作,因此我无法更改架构。只需要实现接口。
template<typename TItem>
class Queue{
public:
/*!
* Dequeues message from the queue
* Returns true on success
*/
virtual bool Dequeue( TItem & item) = 0;
/*!
* Discards(deletes) the item with from the cloud queue.
*/
virtual void Discard(const TReceipt & receipt) = 0;
};
队列的实际实现将提供一个序列化器,该序列化器将 TItem 序列化为 JSON 并返回。
AWS 和 Azure SDK 为每个出队的消息提供了一个收据,以便我以后可以丢弃它。pubsub SDK 的收据是绑定到会话的 AckHandler 对象。
一个明显错误的解决方案是保持会话打开并在 lambda 中等待另一个 condition_variable,直到下一次调用 Dequeue 方法。但是,这看起来像是一个快速而肮脏的解决方案。使用 Pub/Sub 实现此功能的正确方法是什么?
解决方案
是否有可靠的方法等到 ack() 操作完成并检查其状态?
不是真的,因为ack()
操作是尽力而为的。即使您等待ack()
到达服务器,也不能保证消息不会被重新发送。是的,服务可能会在成功后重新发送消息ack()
。
请注意,库会自动延长您的消息的租约,以避免在您明确地ack()
或之前重新发送nack()
,因此延迟ack()
不会影响正确性,除非您在之后不久关闭应用程序ack()
。
AWS 和 Azure SDK 为每个出队的消息提供了一个收据,以便我以后可以丢弃它,但我不知道如何使用 pub/sub 来做到这一点。
您可以调用nack()
丢弃AckHandler
一条消息,这意味着该消息将被重新发送到另一个实例。这就是你所说的“丢弃”吗?
使用 Pub/Sub 实现此功能的正确方法是什么?
嗯,我不确定我是否遵循。我可以猜测 and 的语义Dequeue()
,Discard()
如果我猜错了,请道歉。无论如何,尚不清楚这是否真的适用于任何类型?喜欢TReceipt == int
?
需要注意的是这里有很多猜测,您可以执行以下操作:
class PubsubBuffer {
private:
std::mutex mu_;
std::dequeue<std::pair<pubsub::Message, AckHandlerWrapper>> queue_;
public:
virtual bool Dequeue(pubsub::Message& item, AckHandlerWrapper& receipt) {
std::unique_lock<std::mutex> lk(mu_);
if (queue_.empty()) return false;
auto& f = queue_.front();
item = std::move(f.first);
receipt = std::move(.second);
queue_.pop_front();
return true;
}
virtual void Discard(AckHandlerWrapper const& receipt) {
std::move(receipt.ack_handler).ack();
}
void Push(pubsub::Message m, pubsub::AckHandler h) {
std::unique_lock<std::mutex> lk(mu_);
queue_.push_back(
std::make_pair(std::move(m), AckHandlerWrapper(std::move(h)));
}
};
std::shared_ptr<PubsubBuffer> F(pubsub::Subscriber s) {
auto buffer = std::make_shared<PubsubBuffer>();
auto handler = [buffer](pubsub::Message m, pubsbu::AckHandler h) {
buffer.Push(std::move(m), std::move(h));
};
return buffer;
}
推荐阅读
- aws-sdk - 使用 ID 从 cognito 中检索用户信息
- jenkins - 在詹金斯作业中执行 groovy 脚本
- amazon-dynamodb - dynamo db FilterExpression,使用值作为键在 json 对象中查找
- python - 刷新数据写入数字文件句柄?
- javascript - 如何将缓动放入滚动绑定功能?
- python - 如何在熊猫中转播数据框
- c# - AngularJS Post to C# Function 因大字符串而失败(立即出现错误 500)
- angular - Angular:检查 Observable 的响应类型
- django - 在 django 视图中访问表单元素
- angular - Firebase 动态链接仅在 iOS 中的移动 Safari 和 Chrome 上修改重定向 url。我使用角形辅助插座