首页 > 解决方案 > 使用 Google pub/sub 更新单例 HashMap

问题描述

我有一个用例,我初始化一个包含一组查找数据(有关物联网设备的物理位置等信息)的 HashMap。此查找数据用作第二个数据集(即 PCollection)的参考数据。此 PCollection 是一个数据流,可提供 IoT 设备记录的数据。来自 IoT 设备的数据流使用 Apache Beam 管道,该管道作为使用 Google Cloud pub/sub 的 Google Dataflow 运行。

当我处理 PCollection(设备数据)时,我将 Google Cloud 发布/订阅数据链接到 HashMap 中的相关查找条目。

我需要基于将更改推送到其数据的第二个发布/订阅来更新 HashMap。到目前为止,这是我获得 PCollection 并使用 HashMap 进行查找的方式:

HashMap -> 包含预加载的查找数据(有关 IoT 设备的信息)

PCollection -> 包含来自管道数据流的数据(物联网设备记录的数据)

我正在为 IoT 设备查找数据生成一个 HashMap 作为单例:

public class MyData {

    private static final MyData instance = new MyData ();

    private MyData () {     
            HashMap myDataMap = new HashMap<String, String>();          
               ... logic to populate the map

            this.referenceData = myDataMap;

    }

    public HashMap<Integer, DeviceReference> referenceData;

    public static DeviceData getInstance(){
        return instance;
    }
}

然后,我在另一个类中使用 HashMap,在该类中我订阅了数据的更新(例如,这些消息为我提供了与已存储在 HashMap 中的实体相关的新数据)。我正在使用带有 Apache Beam 的 Google pub/sub 订阅更改:

HashMap<String, String> referenceData = MyData.getInstance().referenceData;

Pipeline pipeLine = Pipeline.create(options);           

// subscribe to changes in data

org.apache.beam.sdk.values.PCollection myDataUpdates;

myDataUpdates = pipeLine.begin()
    .apply(String.format("Subscribe to data updates"),
        PubsubIO.readStrings().fromTopic(
                String.format("myPubSubPath")));

我想要做的是有效地将数据更新应用到单例 HashMap(即根据我的数据订阅操作 HashMap)。我怎样才能做到这一点?

我对 Apache Beam 的了解有限,我只知道如何对管道数据进行转换以创建另一个单独PCollection的 . 我认为这是 Beam 的重点,它用于将大型数据集转换为不同的形式。有没有一种方法可以使用 Apache Beam 来实现我所需要的(更新基于 pub/sub 订阅的数据集),还是有另一种方法可以使用 pub/sub 更新 HashMap?(我无法轮询数据,因为它会产生过多的延迟和成本,我需要使用订阅更新 HashMap)。


Google 云文档显示了一种直接订阅未链接到 Apache Beam 管道的 Google Cloud pub/sub 的方法。这很有希望作为一个潜在的解决方案,并且依赖于以下 Maven 依赖项:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.53.0</version>
</dependency>

不过,我遇到了冲突,这与 Apache Beam 的以下 Maven 依赖项冲突:

<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>2.5.0</version>
</dependency>

该问题记录在此处的一个单独问题中 - Java 应用程序中的 Maven 冲突与 google-cloud-core-grpc 依赖项。从我所看到的看来,google-cloud-pubsub我使用哪个版本的 Maven 工件似乎并不重要,因为据我所知,v.2.5.0 及以下的梁依赖项似乎总是与当前的任何冲突谷歌依赖的版本。

(我已在 Beam Jira 中将此问题作为一个问题提出 - https://issues.apache.org/jira/browse/BEAM-6118


我目前正在调查侧面输入,并combine作为实现 HashMap 更新的一种方式:

https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine

示例 10 显示了一种.getSideInputsMap()可应用于payload. 我想知道是否可以以某种方式将其应用于我对查找数据更改的订阅。如果我得到PCollection这样的,我不能直接链接.getSideInputsMap()PCollection

deviceReferenceDataUpdates = pipeLine.begin()
    .apply("Get changes to the IoT device lookup data"),
         PubsubIO.readMessages().fromTopic("IoT device lookup data")).

我专门问了一个单独的问题,关于我如何能够使用.getSideInputsMap()- Apache Beam - 我如何将 .getSideInputsMap 应用到对 Google pub/sub 的订阅?

标签: javagoogle-cloud-platformbigdatapublish-subscribeapache-beam

解决方案


我在 Apache Beam 框架中找到了一种方法,如下所示(未完全测试)。

注意- 考虑到@Serg M Ten 对 OP 的评论,更好的方法可能是稍后合并数据,而不是尝试将查找数据作为转换处理的一部分加入。


单例哈希映射

在这里查看我的答案 -从不同的类访问 HashMap


管道(在单线程上,在 中实现main

// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();

org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;

lookupDataUpdateMessage = pipeLine.begin()
                              .apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
                              .apply("Transform lookup update data",
                                 ParDo.of(new TransformLookupData.TransformFn()));

                     org.apache.beam.sdk.values.PCollection lookupDataMessage;

转换

import java.io.Serializable;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;

import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;


public class TransformDeviceMeta

    public static class TransformFn extends DoFn<String, MyLookupData> {

        @ProcessElement
        public void processElement(ProcessContext c) 
        {   
            LookupData lookupData = LookupData.getInstance();

            MyLookupData myLookupDataUpdate = new MyLookupData();

            try 
            {           
                byte[] payload = c.element().getBytes();
                String myLookUpDataJson = new JSONObject(new String(payload)).toString();

                ObjectMapper mapper = new ObjectMapper();
                myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);

                String updatedLookupDataId = updatedLookupDataId.id;

                // logic for HashMap updating e.g:

                    lookupData.myHashMap.remove(updatedDeviceId);
                }
                else {
                    lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);    
                }
            } 
            catch (Exception ex) {
                Log.error(ex.getMessage());
                System.out.println("Error " + ex.getMessage());
            }
        }       
    }   
}

MyLookupData= 构成查找数据模型的类


推荐阅读