pip - 运行 GCP Dataflow 时出现 Python 包错误
问题描述
这个错误是突然发生的。数据流上没有任何变化。我们看到错误“NameError: global name 'firestore' is not defined [while running 'generatedPtransform-12478']”看起来在工作节点上安装软件包时出现问题
我在“DirectRunner”上本地尝试了相同的管道,效果很好。我们参考了链接上的“NameErrors”文档“ https://cloud.google.com/dataflow/docs/resources/faq#how-can-i-tell-what-version-of-the-cloud-dataflow- sdk-is-installedrunning-in-my-environment ”并尝试了以下几件事
1.'save_main_session':真正的管道参数
2.将所有包“导入”命令从全局移动到函数范围
我们在 requirements.txt 中有以下包,
阿帕奇光束[gcp]
谷歌云火库
- python-dateutil
import datetime
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import firestore
import yaml
from functools import reduce
from dateutil.parser import parse
class PubSubToDict(beam.DoFn):
<...to process elements>
class WriteToFS(beam.DoFn):
<...to write data to firestore>
pipeline_options = {
'project': PROJECT,
'staging_location': 'gs://' + BUCKET + '/staging',
'temp_location': 'gs://' + BUCKET + '/temp',
'runner': 'DataflowRunner',
'job_name': JOB_NAME,
'disk_size_gb': 100,
'save_main_session': True,
'region': 'europe-west1',
'requirements_file': 'requirements.txt',
'streaming': True
}
with beam.Pipeline(options=options) as p:
lines = (p | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Transformation" >> beam.ParDo(PubSubToDict()))
FSWrite = (lines | 'Write To Firestore' >> beam.ParDo(WriteToFS()))```
解决方案
推荐阅读
- python - UDP pinger 分配
- android - 广播接收器没有被触发
- mysql - 如何在不使用索引检查 mysql 表中的 100000 条记录等所有行的情况下获得一行?
- r - 使用整体数据中的比率进行插值
- mysql - WordPress 和 MySQL - 服务器请求客户端未知的身份验证方法
- r - R函数内的多重采样
- sql - 为什么浮点值只能在 where 条件下作为文本找到
- html - 如何在angular4和html中拆分otp的输入框?
- excel-formula - Excel - 重叠数据 - 数据透视表
- r - 如何将 IP 地址存储在 R 中,然后对其进行处理以提供 IP 位置?