python - 谷歌数据流作业成本优化
问题描述
我已经为 522 个大小为 100 GB 的 gzip 文件运行了以下代码,解压缩后,它将是大约 320 GB 的数据和 protobuf 格式的数据,并将输出写入 GCS。我使用 n1 标准机器和区域进行输入,输出都得到照顾,工作花费了我大约 17 美元,这是半小时数据,所以我真的需要在这里做一些成本优化非常糟糕。
我从以下查询中获得的成本
SELECT l.value AS JobID, ROUND(SUM(cost),3) AS JobCost
FROM `PROJECT.gcp_billing_data.gcp_billing_export_v1_{}` bill,
UNNEST(bill.labels) l
WHERE service.description = 'Cloud Dataflow' and l.key = 'goog-dataflow-job-id' and
extract(date from _PARTITIONTIME) > "2020-12-31"
GROUP BY 1
完整代码
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import csv
import base64
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
import logging
from io import StringIO
from google.cloud import storage
import json
###PROTOBUF CLASS
from otherfiles import processor_pb2
class ConvertToJson(beam.DoFn):
def process(self, message, *args, **kwargs):
import base64
from otherfiles import processor_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import json
if (len(message) >= 4):
b64ProtoData = message[2]
totalProcessorBids = int(message[3] if message[3] and message[3] is not None else 0);
b64ProtoData = b64ProtoData.replace('_', '/')
b64ProtoData = b64ProtoData.replace('*', '=')
b64ProtoData = b64ProtoData.replace('-', '+')
finalbunary = base64.b64decode(b64ProtoData)
log = processor_pb2.ProcessorLogProto()
log.ParseFromString(finalbunary)
#print(log)
jsonObj = MessageToDict(log,preserving_proto_field_name=True)
jsonObj["totalProcessorBids"] = totalProcessorBids
#wjdata = json.dumps(jsonObj)
print(jsonObj)
return [jsonObj]
else:
pass
class ParseFile(beam.DoFn):
def process(self, element, *args, **kwargs):
import csv
for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
#print (line)
return [line]
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--input", dest="input", required=False)
parser.add_argument("--output", dest="output", required=False)
parser.add_argument("--bucket", dest="bucket", required=True)
parser.add_argument("--bfilename", dest="bfilename", required=True)
app_args, pipeline_args = parser.parse_known_args()
#pipeline_args.extend(['--runner=DirectRunner'])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
bucket_input=app_args.bucket
bfilename=app_args.bfilename
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_input)
blob = bucket.blob(bfilename)
blob = blob.download_as_string()
blob = blob.decode('utf-8')
blob = StringIO(blob)
pqueue = []
names = csv.reader(blob)
for i,filename in enumerate(names):
if filename and filename[0]:
pqueue.append(filename[0])
with beam.Pipeline(options=pipeline_options) as p:
if(len(pqueue)>0):
input_list=app_args.input
output_list=app_args.output
events = ( p | "create PCol from list" >> beam.Create(pqueue)
| "read files" >> beam.io.textio.ReadAllFromText()
| "Transform" >> beam.ParDo(ParseFile())
| "Convert To JSON" >> beam.ParDo(ConvertToJson())
| "Write to BQ" >> beam.io.WriteToBigQuery(
table='TABLE',
dataset='DATASET',
project='PROJECT',
schema="dataevent:STRING",
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
custom_gcs_temp_location='gs://BUCKET/gcs-temp-to-bq/',
method='FILE_LOADS'))
##bigquery failed rows NOT WORKING so commented
#(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS] | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
##WRITING TO GCS
#printFileConetent | "Write TExt" >> beam.io.WriteToText(output_list+"file_",file_name_suffix=".json",num_shards=1, append_trailing_newlines = True)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
这项工作大约需要 49 分钟
我尝试过的事情:1)对于 avro,生成的模式需要在 JSON 中用于 proto 文件,并尝试使用下面的代码将字典转换为 avro msg,但由于字典的大小更大,因此需要时间。schema_separated= 是一个 avro JSON 模式,它工作正常
with beam.Pipeline(options=pipeline_options) as p:
if(len(pqueue)>0):
input_list=app_args.input
output_list=app_args.output
p1 = p | "create PCol from list" >> beam.Create(pqueue)
readListofFiles=p1 | "read files" >> beam.io.textio.ReadAllFromText()
parsingProtoFile = readListofFiles | "Transform" >> beam.ParDo(ParseFile())
printFileConetent = parsingProtoFile | "Convert To JSON" >> beam.ParDo(ConvertToJson())
compressIdc=True
use_fastavro=True
printFileConetent | 'write_fastavro' >> WriteToAvro(
output_list+"file_",
# '/tmp/dataflow/{}/{}'.format(
# 'demo', 'output'),
# parse_schema(json.loads(SCHEMA_STRING)),
parse_schema(schema_separated),
use_fastavro=use_fastavro,
file_name_suffix='.avro',
codec=('deflate' if compressIdc else 'null'),
)
在主代码中,我尝试将 JSON 记录作为字符串插入到 bigquery 表中,这样我就可以在 bigquery 中使用 JSON 函数来提取数据,但结果并不顺利,并且出现以下错误。
message: '读取数据时出错,错误信息:JSON表遇到太多错误,放弃。行数:1;错误: 1. 请查看 errors[] 集合以获取更多详细信息。原因:“无效”> [运行“写入 BQ/BigQueryBatchFileLoads/WaitForDestinationLoadJobs”时]
尝试将上述 JSON 字典插入到为表提供 JSON 模式的 bigquery,并且工作正常
现在的挑战是在将原型反序列化为 JSON dict 后的大小加倍,成本将在数据流中通过处理的数据量来计算
我正在尝试并阅读很多内容来完成这项工作,如果它有效,那么我可以使其稳定用于生产。
示例 JSON 记录。
{'timestamp': '1609286400', 'bidResponseId': '5febc300000115cd054b9fd6840a5af1', 'aggregatorId': '1', 'userId': '7567d74e-2e43-45f4-a42a-8224798bb0dd', 'uniqueResponseId': '', 'adserverId': '1002418', 'dataVersion': '1609285802', 'geoInfo': {'country': '101', 'region': '122', 'city': '11605', 'timezone': '420'}, 'clientInfo': {'os': '4', 'browser': '1', 'remoteIp': '36.70.64.0'}, 'adRequestInfo': {'requestingPage': 'com.opera.mini.native', 'siteId': '557243954', 'foldPosition': '2', 'adSlotId': '1', 'isTest': False, 'opType': 'TYPE_LEARNING', 'mediaType': 'BANNER'}, 'userSegments': [{'id': '2029660', 'weight': -1.0, 'recency': '1052208'}, {'id': '2034588', 'weight': -1.0, 'recency': '-18101'}, {'id': '2029658', 'weight': -1.0, 'recency': '744251'}, {'id': '2031067', 'weight': -1.0, 'recency': '1162398'}, {'id': '2029659', 'weight': -1.0, 'recency': '862833'}, {'id': '2033498', 'weight': -1.0, 'recency': '802749'}, {'id': '2016729', 'weight': -1.0, 'recency': '1620540'}, {'id': '2034584', 'weight': -1.0, 'recency': '111571'}, {'id': '2028182', 'weight': -1.0, 'recency': '744251'}, {'id': '2016726', 'weight': -1.0, 'recency': '1620540'}, {'id': '2028183', 'weight': -1.0, 'recency': '744251'}, {'id': '2028178', 'weight': -1.0, 'recency': '862833'}, {'id': '2016722', 'weight': -1.0, 'recency': '1675814'}, {'id': '2029587', 'weight': -1.0, 'recency': '38160'}, {'id': '2028177', 'weight': -1.0, 'recency': '862833'}, {'id': '2016719', 'weight': -1.0, 'recency': '1675814'}, {'id': '2027404', 'weight': -1.0, 'recency': '139031'}, {'id': '2028172', 'weight': -1.0, 'recency': '1052208'}, {'id': '2028173', 'weight': -1.0, 'recency': '1052208'}, {'id': '2034058', 'weight': -1.0, 'recency': '1191459'}, {'id': '2016712', 'weight': -1.0, 'recency': '1809526'}, {'id': '2030025', 'weight': -1.0, 'recency': '1162401'}, {'id': '2015235', 'weight': -1.0, 'recency': '139031'}, {'id': '2027712', 'weight': -1.0, 'recency': '139031'}, {'id': '2032447', 'weight': -1.0, 'recency': '7313670'}, {'id': '2034815', 'weight': -1.0, 'recency': '586825'}, {'id': '2034811', 'weight': -1.0, 'recency': '659366'}, {'id': '2030004', 'weight': -1.0, 'recency': '139031'}, {'id': '2027316', 'weight': -1.0, 'recency': '1620540'}, {'id': '2033141', 'weight': -1.0, 'recency': '7313670'}, {'id': '2034736', 'weight': -1.0, 'recency': '308252'}, {'id': '2029804', 'weight': -1.0, 'recency': '307938'}, {'id': '2030188', 'weight': -1.0, 'recency': '3591519'}, {'id': '2033449', 'weight': -1.0, 'recency': '1620540'}, {'id': '2029672', 'weight': -1.0, 'recency': '1441083'}, {'id': '2029664', 'weight': -1.0, 'recency': '636630'}], 'perfInfo': {'timeTotal': '2171', 'timeBidInitialize': '0', 'timeProcessDatastore': '0', 'timeGetCandidates': '0', 'timeAdFiltering': '0', 'timeEcpmComputation': '0', 'timeBidComputation': '0', 'timeAdSelection': '0', 'timeBidSubmit': '0', 'timeTFQuery': '0', 'timeVWQuery': '8'}, 'learningPercent': 0.10000000149011612, 'pageLanguageId': '0', 'sspUserId': 'CAESECHFlNeuUm16IYThguoQ8ck_1', 'minEcpm': 0.12999999523162842, 'adSpotId': '1', 'creativeSizes': [{'width': '7', 'height': '7'}], 'pageTypeId': '0', 'numSlots': '0', 'eligibleLIs': [{'type': 'TYPE_OPTIMIZED', 'liIds': [{'id': 44005, 'reason': '12', 'creative_id': 121574, 'bid_amount': 8.403361132251052e-08}, {'id': 46938, 'reason': '12', 'creative_id': 124916, 'bid_amount': 8.403361132251052e-06}, {'id': 54450, 'reason': '12', 'creative_id': 124916, 'bid_amount': 2.0117618771650174e-05}, {'id': 54450, 'reason': '12', 'creative_id': 135726, 'bid_amount': 2.4237295484638312e-05}]}, {'type': 'TYPE_LEARNING'}], 'bidType': 4, 'isSecureRequest': True, 'sourceType': 3, 'deviceBrand': 82, 'deviceModel': 1, 'sellerNetworkId': 12814, 'interstitialRequest': False, 'nativeAdRequest': True, 'native': {'mainImg': [{'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}, {'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}, {'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}, {'w': 0, 'h': 0, 'wmin': 1200, 'hmin': 627}], 'iconImg': [{'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}, {'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}, {'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}, {'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}], 'logoImg': [{'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}, {'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}, {'w': 0, 'h': 0, 'wmin': 100, 'hmin': 100}, {'w': 0, 'h': 0, 'wmin': 0, 'hmin': 0}]}, 'throttleWeight': 1, 'isSegmentReceived': False, 'viewability': 46, 'bannerAdRequest': False, 'videoAdRequest': False, 'mraidAdRequest': True, 'jsonModelCallCount': 0, 'totalProcessorBids': 1}
有人可以在这里帮助我吗?
解决方案
我的建议是使用 Java 来执行转换。
在 Java 中,您可以像这样将 Protobuf 转换为 Avro:Writing protobuf object in parquet using apache beam
完成此操作后,您可以使用AvroIO
将数据写入文件。
Java 比 Python 性能要好得多,并且会节省您的计算资源。由于这项工作非常简单,并且不需要任何特殊的 Python 库,因此我强烈建议您尝试使用 Java。
推荐阅读
- amazon-ec2 - 我应该使用什么目标三元组来为 AWS Graviton2 实例构建 Rust 项目?
- postman - 如何将邮递员环境变量的值设置为另一个变量的值?
- docker - Dockerfile:如何获取 Anaconda
- javascript - 如何从javascript中的对象数组添加表格行?
- python - 未能加载原生 TensorFlow 运行时 - DLL 加载失败
- react-native - React-native-maps 初始缩放:0 不适用于 iOS
- unity3d - 在 HoloLens 2 中展开任何游戏对象
- c - 链表:displayList 函数打印最后一个元素
- regex - 当索引未知时,如何从正则表达式的最后一组中提取文本?
- c# - 选项卡式页面更改图标