首页 > 解决方案 > 如何在 PySpark 中注册本机类或函数?

问题描述

我最近一直在做 PySpark,读完这篇博文后,我想到了一个问题

https://mungingdata.com/apache-spark/registerfunction-injectfunction/

有没有办法在 Python 中做到这一点?例如,我想有一堂课,比如

MyClass(??):
    """
    A PySpark Class
    """

    return self.read.load(path/to/file)

<somecode here to register it>

然后,从我的火花会话中,我想做类似的事情

myDf = spark.MyClass()

这在 PySpark 中可能吗?

主要动机是以某种方式围绕 PySpark 进行包装,以提供特定的增强功能。

标签: python-3.xapache-sparkpyspark

解决方案


你在 python 中工作,所以我不太明白是什么阻碍了你。

这是一个代码示例,用于以某种方式执行您想做的事情:

from pyspark.sql import SparkSession


class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class Spark(metaclass=Singleton):
    def __init__(self):
        self._spark = (
            SparkSession.builder.master("yarn").appName(app_name).getOrCreate()
        )

    def __getattr__(self, attr):
        return getattr(self._spark, attr)

    # Add any other method you'd like

推荐阅读