google-cloud-platform - 无法通过气流BeamRunPythonPipelineOperator运行python管道
问题描述
我无法通过气流运行 python 管道BeamRunPythonPipelineOperator
。以下是我的完整代码:
DAG文件
import os
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
default_args = {
"owner": "<...>",
"start_date": days_ago(1),
'dataflow_default_options': {
"project": "<...>",
}
}
dag = DAG(
dag_id="word_count",
default_args=default_args,
schedule_interval="@once"
)
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file="gs://<...>/word_count.py",
pipeline_options={
'input':"gs://<...>/kinglear.txt",
'output':"gs://<...>/output.txt",
'temp_location':"gs://<...>/temp/",
'staging_location':"gs://<...>/temp/",
},
py_options=[],
py_requirements=['apache-beam[gcp]==2.26.0'],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}', project_id="<...>", location="us-central1"
),
dag=dag,
)
Python 文件 (word_count.py)
"""A word-counting workflow."""
# pytype: skip-file
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[\w\']+', element, re.UNICODE)
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://<...>/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
default='gs://<...>/output.txt',
help='Output file to write results to.')
argv = [
'--project=<...>',
'--region=us-central1',
'--runner=DataflowRunner',
'--staging_location=gs://<...>/temp/',
'--temp_location=gs://<...>/temp/',
'--template_location=gs://<...>/templates/word_count_template'
]
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(argv=argv,options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
下面是作曲家的截图:
我无法在控制台中看到数据流作业,也无法在桶中看到计数结果。有人可以向我建议正确的方法或对此有何建议吗?
解决方案
你的 DAG 没问题,问题出在Beam Python 文件上,当你在argv
. 最好的方法是extend
pipeline_args
。并且作业没有被提交,因为您正在发送argv
.beam.Pipeline
以下是固定代码:
word_count.py :
"""A word-counting workflow."""
# pytype: skip-file
import argparse
import logging
import re
import os
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[\w\']+', element, re.UNICODE)
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
default='gs://<bucket>/newoutput',
help='Output file to write results to.')
#argv = [
# '--project=<...>',
# '--region=us-central1',
# '--runner=DataflowRunner',
# '--staging_location=gs://<...>/temp/',
# '--temp_location=gs://<...>/temp/',
# '--template_location=gs://<...>/templates/word_count_template'
# ]
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=<project-name>',
'--region=<region>',
'--staging_location=gs://<bucket>/',
'--temp_location=gs://<bucket>/temp',
'--job_name=your-wordcount-job',
])
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
推荐阅读
- javascript - Axios CORS Vultr S3“无‘访问控制允许来源’”
- oracle - Oracle Apex 日期选择器行为异常
- swift - 如何检测应用程序何时安装并首先启动?
- flutter - Flutter:List.sort()降序中的错误索引删除
- typescript - TypeScript 类中第一个定义的方法未正确键入(但之后的所有内容)使用键入 this 的函数
- c++ - Ofstream 不写入文件?
- c++ - 在代码块中调试数组
- java - java扩大最终变量
- php - substr_compare() 在 php 中返回错误的值
- node.js - 通过数据库和表格中的参数比较模型