apache-spark - How to use Prefect's resource manager with a spark cluster
问题描述
I have been messing around with Prefect for workflow management, but got stuck with building up and braking down a spark session withing Prefect's resource manager.
I browsed Prefects docs and an example with Dusk is available:
from prefect import resource_manager
from dask.distributed import Client
@resource_manager
class DaskCluster:
def init(self, n_workers):
self.n_workers = n_workers
def setup(self):
"Create a local dask cluster"
return Client(n_workers=self.n_workers)
def cleanup(self, client):
"Cleanup the local dask cluster"
client.close()
with Flow("example") as flow:
n_workers = Parameter("n_workers")
with DaskCluster(n_workers=n_workers) as client:
some_task(client)
some_other_task(client)
However I couldn't work out how to do the same with a spark session.
解决方案
最简单的方法是在本地模式下使用 Spark:
from prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
您的setup()
方法返回被管理的资源,并且该cleanup()
方法接受由setup()
. 在这种情况下,我们创建并返回一个 Spark 会话,然后停止它。您不需要spark-submit
或任何东西(尽管我发现以这种方式管理依赖项有点困难)。
扩大规模变得更加困难,这是我仍在努力解决的问题。例如,Prefect 不知道如何序列化 Spark DataFrames 以进行输出缓存或持久化结果。此外,您必须小心将 Dask 执行器与 Spark 会话一起使用,因为它们不能被腌制,因此您必须设置执行器以使用scheduler='threads'
(请参阅此处)。
推荐阅读
- node.js - 如何重定向到 ASK-SDK v2 Node.JS 中的不同意图处理程序?
- javascript - javascript 引用数组语法
- google-maps - Google Maps v3 API - 自动平滑缩放到标记
- python - 使用不同大小的数据框创建数据框
- java - 如何在使用 jira-plugin 在 JIRA 中创建问题时填充字段的默认值?
- java - Problem with one command I want to execute through Java in Linux shell
- sqlite - 为什么查询 sql_master 在某些机器上没有返回任何内容?
- shopify - Shopify - 从架构中循环数据
- sails.js - 如何使用 Find 方法对数据库字段执行不区分大小写的搜索?
- windows - 将“git describe --abbrev=0”结果放入变量中