首页 > 解决方案 > GCP PubSub:创建配置类以发布消息

问题描述

我正在尝试创建一个单独的类来连接到 GCPPUB。我有以下类成功连接并将消息推送到 gcp。但问题是每次它使用 GCP 凭据验证来自我的应用程序的连接时。我想在连续发布消息后获得一次连接?有没有办法,我可以分离我的代码,以便只有在现有连接为空时才获得连接,然后在那里发布消息?它是只获得一个连接的正确方法吗?

@Slf4j
public class GCPMessagePublisher {

    private static final String PROJECT_ID = "myprojectId";
    Publisher publisher = null;

    public void putMessageOnGCP(String message) throws Exception
    {
        log.info("The outgoing message to GCP PUBSUB is : "+message);
        // topic id, eg. "my-topic"
        String topicId = "topic_name";
        int messageCount = 10;
        ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);

        List<ApiFuture<String>> futures = new ArrayList<>();

        try {
            GoogleCredentials credentials = GoogleCredentials.fromStream(
                    new FileInputStream("gcp credential here ........"));

            publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();

            for (int i = 0; i < messageCount; i++) {

                // convert message to bytes
                ByteString data = ByteString.copyFromUtf8(message);
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                        .setData(data)
                        .build();

                // Schedule a message to be published. Messages are automatically batched.
                ApiFuture<String> future = publisher.publish(pubsubMessage);
                futures.add(future);
            }
        } finally {
            // Wait on any pending requests
            List<String> messageIds = ApiFutures.allAsList(futures).get();

            for (String messageId : messageIds) {
                System.out.println(messageId);
            }

            if (publisher != null) {
                // When finished with the partypublisher, shutdown to free up resources.
                publisher.shutdown();
            }
        }
    }
}

标签: javagoogle-cloud-platformpublish-subscribegoogle-cloud-pubsub

解决方案


最后,我发布了解决方案,用于将上述类分离为两种单独的方法,一种用于纯 GCP Config,另一种仅用于将消息发布到 GCPPUB。

**注意:**我使用 SpringBoot 开发我的应用程序,所以我们可能会看到一些 Spring 注释

@Service
public class GCPMessagePublisher
{

   @Value("${gcp.proj.id}")
    private static String projectId;

    @Value(("${gcp.pubsub.topic.name}"))
    private static  String topicId;


    private static  Publisher publisher = null;


    public static void buildGCPConfiguration() throws Exception
    {

        ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
        try
        {
            GoogleCredentials credentials = GoogleCredentials.fromStream(
                    new FileInputStream("gcp credential here use the file path to key.json which you downloaded from google console........"));

            publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();

        }catch (Exception e)
        {
            log.error("Exception Thrown while connecting gcp"+e);
        }
    }

    public void putMessageOnGCP (String message) throws Exception
    {
        log.info("Sending Message to GCP PUBSUB......"+message)
        List<ApiFuture<String>> futures = new ArrayList<>();
        if (publisher == null) {
            buildGCPConfiguration();
        }

        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
        // Schedule a message to be published. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);

    }


}

推荐阅读