java - GCP PubSub:创建配置类以发布消息
问题描述
我正在尝试创建一个单独的类来连接到 GCPPUB。我有以下类成功连接并将消息推送到 gcp。但问题是每次它使用 GCP 凭据验证来自我的应用程序的连接时。我想在连续发布消息后获得一次连接?有没有办法,我可以分离我的代码,以便只有在现有连接为空时才获得连接,然后在那里发布消息?它是只获得一个连接的正确方法吗?
@Slf4j
public class GCPMessagePublisher {
private static final String PROJECT_ID = "myprojectId";
Publisher publisher = null;
public void putMessageOnGCP(String message) throws Exception
{
log.info("The outgoing message to GCP PUBSUB is : "+message);
// topic id, eg. "my-topic"
String topicId = "topic_name";
int messageCount = 10;
ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
List<ApiFuture<String>> futures = new ArrayList<>();
try {
GoogleCredentials credentials = GoogleCredentials.fromStream(
new FileInputStream("gcp credential here ........"));
publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
for (int i = 0; i < messageCount; i++) {
// convert message to bytes
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(data)
.build();
// Schedule a message to be published. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
// Wait on any pending requests
List<String> messageIds = ApiFutures.allAsList(futures).get();
for (String messageId : messageIds) {
System.out.println(messageId);
}
if (publisher != null) {
// When finished with the partypublisher, shutdown to free up resources.
publisher.shutdown();
}
}
}
}
解决方案
最后,我发布了解决方案,用于将上述类分离为两种单独的方法,一种用于纯 GCP Config,另一种仅用于将消息发布到 GCPPUB。
**注意:**我使用 SpringBoot 开发我的应用程序,所以我们可能会看到一些 Spring 注释
@Service
public class GCPMessagePublisher
{
@Value("${gcp.proj.id}")
private static String projectId;
@Value(("${gcp.pubsub.topic.name}"))
private static String topicId;
private static Publisher publisher = null;
public static void buildGCPConfiguration() throws Exception
{
ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
try
{
GoogleCredentials credentials = GoogleCredentials.fromStream(
new FileInputStream("gcp credential here use the file path to key.json which you downloaded from google console........"));
publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
}catch (Exception e)
{
log.error("Exception Thrown while connecting gcp"+e);
}
}
public void putMessageOnGCP (String message) throws Exception
{
log.info("Sending Message to GCP PUBSUB......"+message)
List<ApiFuture<String>> futures = new ArrayList<>();
if (publisher == null) {
buildGCPConfiguration();
}
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Schedule a message to be published. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
}
推荐阅读
- c# - SSRS 报表执行服务渲染为图像高 DPI 和图像渲染小
- javascript - 从多个页面访问时模块变量未“保存”
- r - 如何将字符串连接到满足特定条件的列中的值?
- javascript - 放大 (JavaScript) - SAML 登录
- ios - SwiftUI 中的插入/移除动画
- reactjs - 如何不在某些特定页面上显示按钮?
- node.js - 创建具有许多深度关联的 Sequelize 对象
- c - 如何在 Windows/Ubuntu 中安装 cinterop 工具
- java - 在 Eclipse 中添加 SAP HANA 服务器 - 连接到 SAP 失败
- continuous-integration - Bamboo 的 Springboot 部署问题