apache-spark - Databricks 引发了将结构化流写入大量接收器的最佳实践?
问题描述
我正在使用databricks spark 3.x,并且正在读取大量流(100+),每个流都有自己的合同,需要写出到自己的delta/parquet/sql/whatever表中. 虽然这是很多流,但每个流的活动量很低 - 有些流每天可能只看到数百条记录。我确实想要流式传输,因为我的目标是采用相当低延迟的方法。
这就是我所说的(为简单起见,代码缩写;我正在正确使用检查点、输出模式等)。假设一个schemas
变量包含每个主题的架构。我已经尝试过这种方法,我在其中创建了大量单独的流,但它需要大量计算并且大部分都被浪费了:
def batchprocessor(topic, schema):
def F(df, batchId):
sql = f'''
MERGE INTO SOME TABLE
USING SOME MERGE TABLE ON SOME CONDITION
WHEN MATCHED
UPDATE SET *
WHEN NOT MATCHED
INSERT *
'''
df.createOrReplaceTempView(f"SOME MERGE TABLE")
df._jdf.sparkSession().sql(sql)
return F
for topic in topics:
query = (spark
.readStream
.format("delta")
.load(f"/my-stream-one-table-per-topic/{topic}")
.withColumn('json', from_json(col('value'),schemas[topic]))
.select(col('json.*'))
.writeStream
.format("delta")
.foreachBatch(batchProcessor(topic, schema))
.start())
我还尝试只创建一个进行大量过滤的流,但即使在我将单个消息推送到单个主题的测试环境中,性能也非常糟糕:
def batchprocessor(df, batchId):
df.cache()
for topic in topics:
filteredDf = (df.filter(f"topic == '{topic}'")
.withColumn('json', from_json(col('value'),schemas[topic]))
.select(col('json.*')))
sql = f'''
MERGE INTO SOME TABLE
USING SOME MERGE TABLE ON SOME CONDITION
WHEN MATCHED
UPDATE SET *
WHEN NOT MATCHED
INSERT *
'''
filteredDf.createOrReplaceTempView(f"SOME MERGE TABLE")
filteredDf._jdf.sparkSession().sql(sql)
df.unpersist()
query = (spark
.readStream
.format("delta")
.load(f"/my-stream-all-topics-in-one-but-partitioned")
.writeStream
.format("delta")
.foreachBatch(batchProcessor)
.start())
有什么好的方法可以从本质上解复用这样的流吗?它已经分区了,所以我假设查询计划器没有做太多冗余工作,但似乎仍然存在大量开销。
解决方案
我运行了一堆基准测试,选项 2 更有效。我还不完全知道为什么。
最终,性能仍然不是我想要的——每个主题按顺序运行,无论大小如何,因此每个主题上的单个记录会导致 FIFO 调度程序将许多非常低效的小型操作排队。我使用并行化解决了这个问题:
import threading
def writeTable(table, df, poolId, sc):
sc.setLocalProperty("spark.scheduler.pool", poolId)
df.write.mode('append').format('delta').saveAsTable(table)
sc.setLocalProperty("spark.scheduler.pool", None)
def processBatch(df, batchId):
df.cache()
dfsToWrite = {}
for row in df.select('table').distinct().collect():
table = row.table
filteredDf = df.filter(f"table = '{table}'")
dfsToWrite[table] = filteredDf
threads = []
for table,df in dfsToWrite.items():
threads.append(threading.Thread(target=writeTable,args=(table, df,table,spark.sparkContext)))
for t in threads:
t.start()
for t in threads:
t.join()
df.unpersist()
推荐阅读
- amazon-s3 - 带有 S3 后端的 Terraform 如何发现所有工作区?
- reactjs - 带有打字稿的rjsf,我如何在onSubmit处理程序中访问formData和事件
- docker - 是否可以通过 Docker Desktop 应用程序从 Docker Volume 下载文件?
- java - 将文件上的 ArrayList 转换为 Xml
- django - 同一域名下的一台IIS服务器上的2个Django项目
- r - Web抓取带有R中项目列表的页面
- vue.js - 如何在 Vue 3 项目中显示使用 date-fns 格式的 Firestore 时间戳字段?
- kubernetes - 在位于同一 VPC 的多个 eks 集群之间建立 tcp 通信
- python - Python:从 Binance API 获取加密货币对价格,从文件中循环对
- javascript - 即使在将数据类型设为 int 之后,为什么 javascript 将其作为字符串