首页 > 解决方案 > 从 Cloud Function 触发数据流作业时出现 dill 错误

问题描述

问题

我正在编写一个 GCP 云函数,该函数从 pubsub 消息中获取输入 id,处理并将表输出到 BigQuery。

代码如下:

from __future__ import absolute_import
import base64
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import os


def processing_data_function():
    # do stuff and return desired data 

def create_data_from_id():
    # take scrapinghub's job id and extract the data through api 

def run(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    # Take pubsub message and also Scrapinghub job's input id 
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')  

    agrv = ['--project=project-name', 
            '--region=us-central1', 
            '--runner=DataflowRunner', 
            '--temp_location=gs://temp/location/', 
            '--staging_location=gs://staging/location/']
    p = beam.Pipeline(options=PipelineOptions(agrv))
    (p
        | 'Read from Scrapinghub' >> beam.Create(create_data_from_id(pubsub_message))
        | 'Trim b string' >> beam.FlatMap(processing_data_function)
        | 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
                'table_name',
                schema=schema,
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    p.run()


if __name__ == '__main__':
    run()

请注意,来自 Scrapinghub(scrapy 的抓取网站)的 2 个函数create_data_from_idprocessing_data_function处理数据非常冗长,因此我不想在此处包含它们。它们也与错误无关,因为如果我从云 shell 运行此代码并argparse.ArgumentParser()改为使用传递参数,则此代码有效。

关于我遇到的错误,虽然部署代码没有问题并且发布订阅消息可以成功触发该功能,但数据流作业失败并报告此错误:

"Error message from worker: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'main'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
  File "apache_beam/runners/worker/operations.py", line 616, in apache_beam.runners.worker.operations.DoOperation.setup
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'main'

我试过的

鉴于我可以从云 shell 运行相同的管道,但使用参数解析器而不是指定选项,我认为选项说明的方式是问题所在。因此,我尝试了不同的选项组合,有或没有--save_main_session, --staging_location, --requirement_file=requirements.txt, --setup_file=setup.py ... 他们都报告了或多或少相同的问题,所有的 dill 都不知道要选择哪个模块。和save_main_session指定时,无法获取主会话。指定了requirement_file 和setup_file,甚至没有成功创建该作业,因此我可以省去您查看其错误的麻烦。我的主要问题是我不知道这个问题来自哪里,因为我以前从未使用过 dill,为什么从 shell 和云函数运行管道会有如此不同?有人有线索吗?

谢谢

标签: pythongoogle-cloud-functionsgoogle-cloud-dataflowgoogle-cloud-pubsubdill

解决方案


您也可以尝试修改最后一部分并测试以下是否有效:

if __name__ == "__main__":
    ...

此外,请确保您在正确的文件夹中执行脚本,因为它可能与目录中文件的命名或位置有关。

请考虑以下可能对您有所帮助的来源:来源 1来源 2

我希望这个信息帮助。


推荐阅读