sql - 在火花流中加入大数据
问题描述
我们有一个包含 700 万条记录的大型客户表,我们正在尝试处理来自 kafka 流的一些事务数据(每批 50 万条消息)。
在处理过程中,我们需要将交易数据与客户数据连接起来。目前这需要我们大约 10 秒,而要求是将其降低到 5 秒。由于客户表太大,我们不能使用广播连接。我们还能做其他优化吗?
== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
:- Subquery custProfile
: +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- Subquery jz_view_sub_cust_profile
: +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
: +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166
== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
:- Subquery custProfile
: +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- Subquery jz_view_sub_cust_profile
: +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
: +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166
== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Project
+- Join Inner, Some((custId#110 = rowkey#0))
:- Project [rowkey#0]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- Project [custId#110]
+- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#119L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#122L])
+- Project
+- SortMergeJoin [rowkey#0], [custId#110]
:- Sort [rowkey#0 ASC], false, 0
: +- TungstenExchange hashpartitioning(rowkey#0,200), None
: +- Project [rowkey#0]
: +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
: +- HiveTableScan [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4], MetastoreRelation db_localhost, ext_sub_cust_profile, None
+- Sort [custId#110 ASC], false, 0
+- TungstenExchange hashpartitioning(custId#110,200), None
+- Project [custId#110]
+- Scan ExistingRDD[key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118]
解决方案
- 假设客户数据在小批量中是恒定的,使用散列分区器在 customerId 上分区此客户数据并将其缓存在 RDD/DF 中。
- 由于事务数据来自 Kafka,因此在发布到 Kafka https://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-时,也可以使用哈希分区器将这些数据分区到相同的键上kafka-part-2.html
这应该减少加入两个数据集的时间,但唯一的条件是两个数据集(交易数据和客户数据)中的分区键应该相同。
推荐阅读
- python - 如何使用 OpenCV 裁剪圆形图像?
- java - Imebra 库显示传输语法 1.2.840.10008.1.2.1 的完全灰色图像
- javascript - 我需要帮助修复列表数组中的错误
- echarts - 如何为同一图表中的不同系列设置不同的图例图标
- data-structures - LinkedList 返回空
- javascript - 为另一个模型设置新值重新分配旧模型(Angular)
- python - 从 Pandas Dataframe 中的最后一个有效行填充无效的 x,y 位置数据
- vue.js - vue i18n中的复数
- php - PHP:复制文件,同时写入
- xamarin - 如何在我的 Xamarin.Forms 应用程序中实现相机?