python - What is considered a very large dataset for the Python Beam SDK?
问题描述
I'm working on setting up a Google Dataflow using the Python 2.31 Beam SDK. It's a pretty straight forward task that reads from a BigQuery table, performs some logic to append a few additional columns to the data, and then writes the results back to a separate BigQuery table.
This has successfully run for 48 billion rows of data, but fails when run for around 80 billion rows. The issue appears to be with the writing to Big Query step, as I can see errors in the diagnostics page mentioning steps in the Write Results to BigQuery phase being blocked or taking a long time. I can also see Runtime Errors with Instruction id not being registered in that same phase.
The SDK documentation notes there may be issues writing very large datasets with Python. https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations. Based on that, I did change the code around a bit to write to 5 different tables instead of a single one, but ran into similar issues.
My assumption is that limitation is what I'm running into here. I'm unclear on how to find where that limitation is or if there's a decent workaround for it. I just found some info about streaming inserts, so I'm going to do some testing with that to see if that works better than using file loads.
For reference, this is a slightly redacted version of the code for the non partitioned attempt:
from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, \
WorkerOptions
import pandas as pd
import logging
class JoiningDoFn2(beam.DoFn):
# Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
def __init__(self):
#self._pickle_file = None
from google.cloud import storage
import pickle as pkl
self._storage = storage
self._pkl = pkl
def setup(self):
bucket = self._storage.Client().get_bucket("GCS BUCKET NAME")
blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
self._pickle_file = self._pkl.loads(blob.download_as_bytes())
def process(self, gps_element):
import numpy as np
# Get coordinates in radians
balltree = self._pickle_file[0]
density_mapping = self._pickle_file[1]
nn_lat_mapping = self._pickle_file[2]
nn_lon_mapping = self._pickle_file[3]
distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)
# Use mapping to get traffic density from closest index
gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
gps_element["distance_to_nn"] = 6371 * distances[0][0]
gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
yield gps_element
DESTINATION_TABLE_CONFIGS = {
"table": "TARGET TABLE NAME",
"dataset": "TARGET DATASET NAME",
"project": "PROJECT NAME",
"schema": "driverid:STRING, tripid:STRING, TimeStamp:NUMERIC,\
GPSLatitude:FLOAT64, GPSLongitude:FLOAT64, \
traffic_density:FLOAT64, distance_to_nn:FLOAT64,\
nn_lat:FLOAT64, nn_lon:FLOAT64",
"create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
"write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND
}
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='PROJECT NAME',
#job_name=job_name,
region='us-east4',
subnetwork= 'SUBNETWORK INFO',
use_public_ips=False,
staging_location = f'gs://GCS BUCKET NAME/ubi/dataflow/' + 'staging/',
temp_location = f'gs://GCS BUCKET NAME/ubi/dataflow/' + 'temp/',
machine_type = "n1-standard-1",
max_num_workers = 600,
#num_workers = 500,
autoscaling_algorithm='THROUGHPUT_BASED'#None
#disk_size_gb = 30
)
class DataFlowPipeline:
"""THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""
def run(self):
"""This is the job runner it holds the beam pipeline"""
with beam.Pipeline(options=pipeline_options) as p:
query_sql = """
SQL QUERY TEXT
"""
processed_trips = p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True) | 'Add traffic density' >> beam.ParDo(JoiningDoFn2()) | 'Write result to BQ' >> WriteToBigQuery(**DESTINATION_TABLE_CONFIGS)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
print('setting up config for runner...')
trainer = DataFlowPipeline()
trainer.run()
print('The runner is done!')
And this is a slightly redacted version of the attempt outputting to 5 different tables:
from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, \
WorkerOptions
import pandas as pd
import logging
class JoiningDoFn2(beam.DoFn):
# Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
def __init__(self):
#self._pickle_file = None
from google.cloud import storage
import pickle as pkl
self._storage = storage
self._pkl = pkl
def setup(self):
bucket = self._storage.Client().get_bucket('GCS BUCKET NAME')
blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
self._pickle_file = self._pkl.loads(blob.download_as_bytes())
def process(self, gps_element):
import numpy as np
# Get coordinates in radians
balltree = self._pickle_file[0]
density_mapping = self._pickle_file[1]
nn_lat_mapping = self._pickle_file[2]
nn_lon_mapping = self._pickle_file[3]
distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)
# Use mapping to get traffic density from closest index
gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
gps_element["distance_to_nn"] = 6371 * distances[0][0]
gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
yield gps_element
DESTINATION_TABLE_CONFIGS = {
"dataset": "TARGET DATASET",
"project": "PROJECT NAME",
"schema": "driverid:STRING, tripid:STRING, TimeStamp:NUMERIC,\
GPSLatitude:FLOAT64, GPSLongitude:FLOAT64, \
traffic_density:FLOAT64, distance_to_nn:FLOAT64,\
nn_lat:FLOAT64, nn_lon:FLOAT64",
"create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
"write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND
}
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='PROJECT NAME',
#job_name=job_name,
region='us-east4',
subnetwork= 'SUBNETWORK INFO',
use_public_ips=False,
staging_location = 'gs://GCS BUCKET/ubi/dataflow/' + 'staging/',
temp_location = 'gs://GCS BUCKET/ubi/dataflow/' + 'temp/',
machine_type = "n1-standard-1",
max_num_workers = 500,
#num_workers = 50,
#disk_size_gb = 30,
autoscaling_algorithm='THROUGHPUT_BASED'#None
)
class DataFlowPipeline:
"""THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""
def run(self):
"""This is the job runner it holds the beam pipeline"""
#How many splits the data should do. Make sure to set up the appropriate amount of p_ values below to match this number if it's changed.
PARTITIONS = 5
with beam.Pipeline(options=pipeline_options) as p:
query_sql = """
SQL QUERY TEXT
"""
partition = (
p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True)
| 'Add traffic density' >> beam.ParDo(JoiningDoFn2())
| 'Split output' >> beam.Partition(lambda element, PARTITIONS: round(element["TimeStamp"]) % PARTITIONS, PARTITIONS)
)
for x in range(PARTITIONS):
partition[x] | f'Write Partition {x}' >> beam.io.WriteToBigQuery(table=f'TARGET TABLE NAME_{x}', **DESTINATION_TABLE_CONFIGS)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
print('setting up config for runner...')
trainer = DataFlowPipeline()
trainer.run()
print('The runner is done!')
解决方案
推荐阅读
- google-apps-script - 通过 Google Apps 脚本安全地调用 Google Cloud 函数
- angular - 如何在 WebStorm / PhpStorm 中启用自定义 Angular 库的自动导入?
- c# - 将 2 个不同 texbox 中的 2 个 txt 文件从小写转换为大写
- java - 从java应用程序(mysql,javafx)插入时出现阿拉伯语数据问题
- batch-file - 将带空格的文件名批量输出到不带引号的文件
- node.js - “字符串”类型的表达式不能用于索引类型“请求”
' - spotfire - 如果行中存在特定字符串 + 值,则使用 Case 语句输出值
- android - Android Kotlin 返回 setOnQueryTextListener
- python - 将列字典从不同的数据帧转换为数据帧:pyspark
- flutter - Flutter webview loadUrl方法无法正常工作