首页 > 解决方案 > 在火花流中加入大数据

问题描述

我们有一个包含 700 万条记录的大型客户表,我们正在尝试处理来自 kafka 流的一些事务数据(每批 50 万条消息)。

在处理过程中,我们需要将交易数据与客户数据连接起来。目前这需要我们大约 10 秒,而要求是将其降低到 5 秒。由于客户表太大,我们不能使用广播连接。我们还能做其他优化吗?

== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
   :- Subquery custProfile
   :  +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
   :     +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
   :        +- Subquery jz_view_sub_cust_profile
   :           +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
   :              +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
   +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Join Inner, Some((custId#110 = rowkey#0))
   :- Subquery custProfile
   :  +- Project [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4]
   :     +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
   :        +- Subquery jz_view_sub_cust_profile
   :           +- Project [rowkey#0,thrd_party_ads_opto_flag#4,no_mkt_opto_flag#5]
   :              +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
   +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#119L]
+- Project
   +- Join Inner, Some((custId#110 = rowkey#0))
      :- Project [rowkey#0]
      :  +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
      :     +- MetastoreRelation db_localhost, ext_sub_cust_profile, None
      +- Project [custId#110]
         +- LogicalRDD [key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118], MapPartitionsRDD[190] at rddToDataFrameHolder at custStream.scala:166

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#119L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#122L])
      +- Project
         +- SortMergeJoin [rowkey#0], [custId#110]
            :- Sort [rowkey#0 ASC], false, 0
            :  +- TungstenExchange hashpartitioning(rowkey#0,200), None
            :     +- Project [rowkey#0]
            :        +- Filter ((no_mkt_opto_flag#5 = N) && (thrd_party_ads_opto_flag#4 = N))
            :           +- HiveTableScan [rowkey#0,no_mkt_opto_flag#5,thrd_party_ads_opto_flag#4], MetastoreRelation db_localhost, ext_sub_cust_profile, None
            +- Sort [custId#110 ASC], false, 0
               +- TungstenExchange hashpartitioning(custId#110,200), None
                  +- Project [custId#110]
                     +- Scan ExistingRDD[key#109,custId#110,mktOptOutFlag#117,thirdPartyOptOutFlag#118]

标签: sqlapache-sparkjoinapache-spark-sql

解决方案


  1. 假设客户数据在小批量中是恒定的,使用散列分区器在 customerId 上分区此客户数据并将其缓存在 RDD/DF 中。
  2. 由于事务数据来自 Kafka,因此在发布到 Kafka https://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-时,也可以使用哈希分区器将这些数据分区到相同的键上kafka-part-2.html

这应该减少加入两个数据集的时间,但唯一的条件是两个数据集(交易数据和客户数据)中的分区键应该相同。


推荐阅读