首页 > 解决方案 > What is considered a very large dataset for the Python Beam SDK?

问题描述

I'm working on setting up a Google Dataflow using the Python 2.31 Beam SDK. It's a pretty straight forward task that reads from a BigQuery table, performs some logic to append a few additional columns to the data, and then writes the results back to a separate BigQuery table.

This has successfully run for 48 billion rows of data, but fails when run for around 80 billion rows. The issue appears to be with the writing to Big Query step, as I can see errors in the diagnostics page mentioning steps in the Write Results to BigQuery phase being blocked or taking a long time. I can also see Runtime Errors with Instruction id not being registered in that same phase.

The SDK documentation notes there may be issues writing very large datasets with Python. https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations. Based on that, I did change the code around a bit to write to 5 different tables instead of a single one, but ran into similar issues.

My assumption is that limitation is what I'm running into here. I'm unclear on how to find where that limitation is or if there's a decent workaround for it. I just found some info about streaming inserts, so I'm going to do some testing with that to see if that works better than using file loads.

For reference, this is a slightly redacted version of the code for the non partitioned attempt:

from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery

from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, \
    WorkerOptions
import pandas as pd
import logging

class JoiningDoFn2(beam.DoFn):
    # Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
    def __init__(self):
        #self._pickle_file = None
        from google.cloud import storage
        import pickle as pkl
        self._storage = storage
        self._pkl = pkl

    def setup(self):
        bucket = self._storage.Client().get_bucket("GCS BUCKET NAME")
        blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
        self._pickle_file = self._pkl.loads(blob.download_as_bytes())        

    def process(self, gps_element):
        import numpy as np 

        # Get coordinates in radians
        balltree = self._pickle_file[0]
        density_mapping = self._pickle_file[1]
        nn_lat_mapping  = self._pickle_file[2]
        nn_lon_mapping  = self._pickle_file[3]
        
        distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)

    # Use mapping to get traffic density from closest index
        gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
        gps_element["distance_to_nn"] = 6371 * distances[0][0]

        gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
        gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
 
        yield gps_element
        
        
DESTINATION_TABLE_CONFIGS = {
    "table": "TARGET TABLE NAME",

    "dataset": "TARGET DATASET NAME",

    "project":  "PROJECT NAME",

    "schema": "driverid:STRING,    tripid:STRING,        TimeStamp:NUMERIC,\
                         GPSLatitude:FLOAT64,        GPSLongitude:FLOAT64, \
                         traffic_density:FLOAT64,    distance_to_nn:FLOAT64,\
                         nn_lat:FLOAT64,        nn_lon:FLOAT64",

    "create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

    "write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND

}


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project='PROJECT NAME',
    #job_name=job_name,
    region='us-east4',
    subnetwork= 'SUBNETWORK INFO',
    use_public_ips=False,
    staging_location = f'gs://GCS BUCKET NAME/ubi/dataflow/' + 'staging/',
    temp_location = f'gs://GCS BUCKET NAME/ubi/dataflow/' + 'temp/',    
    machine_type = "n1-standard-1",
    max_num_workers = 600,
    #num_workers = 500,
    autoscaling_algorithm='THROUGHPUT_BASED'#None
    #disk_size_gb = 30

)

class DataFlowPipeline:
    """THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""
    def run(self):
        """This is the job runner it holds the beam pipeline"""

        with beam.Pipeline(options=pipeline_options) as p:

            query_sql =  """
SQL QUERY TEXT
"""            
            
            processed_trips = p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True) | 'Add traffic density' >> beam.ParDo(JoiningDoFn2()) | 'Write result to BQ' >> WriteToBigQuery(**DESTINATION_TABLE_CONFIGS)


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    print('setting up config for runner...')
    trainer = DataFlowPipeline()
    trainer.run()
    print('The runner is done!')

And this is a slightly redacted version of the attempt outputting to 5 different tables:

from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery

from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, \
    WorkerOptions
import pandas as pd
import logging

class JoiningDoFn2(beam.DoFn):
    # Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
    def __init__(self):
        #self._pickle_file = None
        from google.cloud import storage
        import pickle as pkl
        self._storage = storage
        self._pkl = pkl

    def setup(self):
        bucket = self._storage.Client().get_bucket('GCS BUCKET NAME')
        blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
        self._pickle_file = self._pkl.loads(blob.download_as_bytes())        

    def process(self, gps_element):
        import numpy as np

        # Get coordinates in radians
        balltree = self._pickle_file[0]
        density_mapping = self._pickle_file[1]
        nn_lat_mapping  = self._pickle_file[2]
        nn_lon_mapping  = self._pickle_file[3]
        
        distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)

    # Use mapping to get traffic density from closest index
        gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
        gps_element["distance_to_nn"] = 6371 * distances[0][0]

        gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
        gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
 
        yield gps_element
        
        
DESTINATION_TABLE_CONFIGS = {
    "dataset": "TARGET DATASET",

    "project":  "PROJECT NAME",

    "schema": "driverid:STRING,    tripid:STRING,        TimeStamp:NUMERIC,\
                         GPSLatitude:FLOAT64,        GPSLongitude:FLOAT64, \
                         traffic_density:FLOAT64,    distance_to_nn:FLOAT64,\
                         nn_lat:FLOAT64,        nn_lon:FLOAT64",

    "create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

    "write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND
}


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project='PROJECT NAME',
    #job_name=job_name,
    region='us-east4',
    subnetwork= 'SUBNETWORK INFO',
    use_public_ips=False,
    staging_location = 'gs://GCS BUCKET/ubi/dataflow/' + 'staging/',
    temp_location = 'gs://GCS BUCKET/ubi/dataflow/' + 'temp/',        
    machine_type = "n1-standard-1",
    max_num_workers = 500,
    #num_workers = 50,
    #disk_size_gb = 30,
    autoscaling_algorithm='THROUGHPUT_BASED'#None


)

class DataFlowPipeline:
    """THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""        
    
    def run(self):
        """This is the job runner it holds the beam pipeline"""   

        #How many splits the data should do. Make sure to set up the appropriate amount of p_ values below to match this number if it's changed.
        PARTITIONS = 5

        with beam.Pipeline(options=pipeline_options) as p:

            query_sql =  """
SQL QUERY TEXT
"""            

            partition = (
                p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True) 
                | 'Add traffic density' >> beam.ParDo(JoiningDoFn2())
                | 'Split output' >> beam.Partition(lambda element, PARTITIONS: round(element["TimeStamp"]) % PARTITIONS, PARTITIONS) 
            )
            
            for x in range(PARTITIONS):
                partition[x] | f'Write Partition {x}' >> beam.io.WriteToBigQuery(table=f'TARGET TABLE NAME_{x}', **DESTINATION_TABLE_CONFIGS)

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    print('setting up config for runner...')
    trainer = DataFlowPipeline()
    trainer.run()
    print('The runner is done!')

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


推荐阅读