首页 > 解决方案 > Databricks 引发了将结构化流写入大量接收器的最佳实践?

问题描述

我正在使用databricks spark 3.x,并且正在读取大量流(100+),每个流都有自己的合同,需要写出到自己的delta/parquet/sql/whatever表中. 虽然这是很多流,但每个流的活动量很低 - 有些流每天可能只看到数百条记录。我确实想要流式传输,因为我的目标是采用相当低延迟的方法。

这就是我所说的(为简单起见,代码缩写;我正在正确使用检查点、输出模式等)。假设一个schemas变量包含每个主题的架构。我已经尝试过这种方法,我在其中创建了大量单独的流,但它需要大量计算并且大部分都被浪费了:

def batchprocessor(topic, schema):
  def F(df, batchId):
    sql = f'''
  MERGE INTO SOME TABLE
  USING SOME MERGE TABLE ON SOME CONDITION
  WHEN MATCHED
  UPDATE SET *
  WHEN NOT MATCHED
  INSERT *
  '''
    df.createOrReplaceTempView(f"SOME MERGE TABLE")
    df._jdf.sparkSession().sql(sql)
  return F
for topic in topics:
  query = (spark
    .readStream
    .format("delta")
    .load(f"/my-stream-one-table-per-topic/{topic}")
    .withColumn('json', from_json(col('value'),schemas[topic]))
    .select(col('json.*'))
    .writeStream
    .format("delta")
    .foreachBatch(batchProcessor(topic, schema))
    .start())

我还尝试只创建一个进行大量过滤的流,但即使在我将单个消息推送到单个主题的测试环境中,性能也非常糟糕:

def batchprocessor(df, batchId):
  df.cache()
  for topic in topics:
    filteredDf = (df.filter(f"topic == '{topic}'")
      .withColumn('json', from_json(col('value'),schemas[topic]))
      .select(col('json.*')))
    sql = f'''
  MERGE INTO SOME TABLE
  USING SOME MERGE TABLE ON SOME CONDITION
  WHEN MATCHED
  UPDATE SET *
  WHEN NOT MATCHED
  INSERT *
  '''
    filteredDf.createOrReplaceTempView(f"SOME MERGE TABLE")
    filteredDf._jdf.sparkSession().sql(sql)
  df.unpersist()

query = (spark
.readStream
.format("delta")
.load(f"/my-stream-all-topics-in-one-but-partitioned")
.writeStream
.format("delta")
.foreachBatch(batchProcessor)
.start())

有什么好的方法可以从本质上解复用这样的流吗?它已经分区了,所以我假设查询计划器没有做太多冗余工作,但似乎仍然存在大量开销。

标签: apache-sparkpysparkapache-kafkadatabricksspark-structured-streaming

解决方案


我运行了一堆基准测试,选项 2 更有效。我还不完全知道为什么。

最终,性能仍然不是我想要的——每个主题按顺序运行,无论大小如何,因此每个主题上的单个记录会导致 FIFO 调度程序将许多非常低效的小型操作排队。我使用并行化解决了这个问题:

import threading
def writeTable(table, df, poolId, sc):
  sc.setLocalProperty("spark.scheduler.pool", poolId)
  df.write.mode('append').format('delta').saveAsTable(table)
  sc.setLocalProperty("spark.scheduler.pool", None)
def processBatch(df, batchId):
  df.cache()
  dfsToWrite = {}
  for row in df.select('table').distinct().collect():
    table = row.table
    filteredDf = df.filter(f"table = '{table}'")
    dfsToWrite[table] = filteredDf
  threads = []
  for table,df in dfsToWrite.items():
    threads.append(threading.Thread(target=writeTable,args=(table, df,table,spark.sparkContext)))
  for t in threads:
    t.start()
  for t in threads:
    t.join()
  df.unpersist()

推荐阅读