首页 > 解决方案 > 如何添加自定义方法以不仅在插件中以 dask 格式返回数据源,而且还以几种不同的自定义格式返回数据源?

问题描述

我正在开发一个允许从 Github 读取特定 JSON 文件的摄入插件。这些 JSON 文件包含有关我们要使用不同模拟软件模拟的系统的基本信息,每个软件都有自己的输入格式。我们有从 JSON 到每种可用格式的转换器。我现在想在我的插件中添加一个类似于“to_dask”方法的方法“to_format”,但我不断收到“RemoteSequenceSource 对象没有属性“to_format”。有没有办法做到这一点?


from latticejson.convert import to_elegant, to_madx

class RemoteLatticejson(RemoteSource):
    """
    A lattice json source on the server
    """

    name      = 'remote-latticejson'
    container = 'python'
    partition_access = False

    def __init__(self,org, repo, filename, parameters= None, metadata=None, **kwargs):
        # super().__init__(org, repo, filename, parameters, metadata=metadata, **kwargs)
        self._schema = None
        self.org = org
        self.repo = repo
        self.filename = filename
        self.metadata = metadata

        self._dict = None

    def _load(self):
        self._dict = read_remote_file(self.org, self.repo, self.filename)

    def _get_schema(self):
        if self._dict is None:
            self._load()

        self._dtypes = {
                'version': 'str',
                'title': 'str',
                'root': 'str',
                'elements': 'dict',
                'lattice': 'dict'
                }
        return base.Schema(
                datashape=None,
                dtype=self._dtypes,
                shape=(None, len(self._dtypes)),
                npartitions=1,
                extra_metadata={}
                )


    def _get_partition(self, i):
        if self._dict is None:
            self._load_metadata()
        data = [self.read()]
        return [self._dict]


    def read(self):
        if self._dict is None:
            self._load()

        self.metadata = {
                'version': self._dict.get('version'),
                'title': self._dict.get('title'),
                'root': self._dict.get('root')
                }

        return self._dict

    def to_madx(self):
        self._get_schema()
        return to_madx(self._dict)

    def _close(self):
        pass
`

标签: intake

解决方案


这里有两个概念:

  • 一个新的驱动程序,它可以自由地将方法添加到其实现(to_X)中并将它们公开给用户。这是允许的,并且在某些情况下实现了这一点,以传递特定格式或允许访问基础对象(如此)。请注意,通过添加方法,您会使源上已经很长的方法列表变得更长,因此我们不鼓励这样做。
  • 远程源,仅在客户端无法直接访问数据的情况下使用(因为它在本地没有路由、权限或正确的驱动程序)。这种情况更受限制,数据的传输是由“容器”源调解的。如果您想在通过服务器传输数据时为您的源提供新的自定义行为,您需要编写自己的容器以及原始驱动程序(驱动程序将具有container = "mycustom"并且您将使用 注册容器intake.container.register_container)。

从中可以看出,Intake 并不是真正为处理或写入数据而设计的,而是以最简单的方式为您提供可识别形式的数据集。通过限制范围,我们希望保持代码的简单和灵活。


推荐阅读