database - 如何从 TFX BulkInferrer 获取数据帧或数据库写入?
问题描述
我对 TFX 很陌生,但有一个显然可以通过BulkInferrer使用的 ML 管道。这似乎只以 Protobuf 格式产生输出,但由于我正在运行批量推理,我想将结果通过管道传输到数据库。(DB 输出似乎应该是批量推理的默认值,因为批量推理和 DB 访问都利用了并行化......但 Protobuf 是每记录的序列化格式。)
我假设我可以使用Parquet-Avro-Protobuf之类的东西来进行转换(尽管这是在 Java 中,而管道的其余部分在 Python 中),或者我可以自己编写一些东西来一个接一个地使用所有 protobuf 消息,转换将它们转换为 JSON,将 JSON 反序列化为字典列表,然后将字典加载到 Pandas DataFrame 中,或者将其存储为一堆键值对,我将其视为一次性数据库......但这听起来像对于一个非常常见的用例,涉及并行化和优化的大量工作和痛苦。顶级 Protobuf 消息定义是 Tensorflow 的PredictionLog。
这一定是一个常见的用例,因为像这样的 TensorFlowModelAnalytics 函数使用 Pandas DataFrames。我宁愿能够直接写入数据库(最好是 Google BigQuery)或 Parquet 文件(因为 Parquet / Spark 似乎比 Pandas 并行化更好),而且这些似乎应该是常见的用例,但我没有找到任何例子。也许我使用了错误的搜索词?
我还查看了PredictExtractor,因为“提取预测”听起来很接近我想要的......但官方文档似乎没有说明应该如何使用该类。我认为TFTransformOutput听起来像是一个很有前途的动词,但实际上它是一个名词。
我显然在这里遗漏了一些基本的东西。没有人愿意将 BulkInferrer 结果存储在数据库中吗?是否有允许我将结果写入数据库的配置选项?也许我想将ParquetIO或BigQueryIO实例添加到 TFX 管道?(TFX 文档说它在“幕后”使用 Beam,但这并没有说明我应该如何一起使用它们。)但是这些文档中的语法看起来与我的 TFX 代码完全不同,我不确定它们是否'重新兼容?
帮助?
解决方案
(从相关问题复制以提高知名度)
经过一番挖掘,这是一种替代方法,它假设feature_spec
事先不知道。请执行下列操作:
- 将 设置
BulkInferrer
为写入,output_examples
而不是inference_result
将output_example_spec添加到组件构造中。 - 在主管道中添加一个
StatisticsGen
和一个SchemaGen
组件,BulkInferrer
以生成上述模式的架构output_examples
- 使用来自
SchemaGen
和BulkInferrer
读取 TFRecords 的工件并做任何必要的事情。
bulk_inferrer = BulkInferrer(
....
output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
predict_output=bulk_inferrer_pb2.PredictOutput(
output_columns=[bulk_inferrer_pb2.PredictOutputCol(
output_key='original_label_name',
output_column='output_label_column_name', )]))]
))
statistics = StatisticsGen(
examples=bulk_inferrer.outputs.output_examples
)
schema = SchemaGen(
statistics=statistics.outputs.output,
)
之后,可以执行以下操作:
import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils
# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec
# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')
# parse dataset with spec
def parse(raw_record):
return tf.io.parse_example(raw_record, spec)
dataset = dataset.map(parse)
此时,数据集就像任何其他已解析的数据集一样,因此编写 CSV 或 BigQuery 表或从那里写入任何内容都是微不足道的。它确实通过BatchInferencePipeline在ZenML中帮助了我们。
推荐阅读
- c++ - C++ 中的构造函数可以是抽象的吗?
- kubernetes - 在验证 kubernetes python 客户端时遇到问题
- python - 使用 Python 自动裁剪图像以提取内部黑色边框 ROI
- dialogflow-es - 控制 Google 媒体响应上的操作(例如,从第 3 分钟开始)
- php - AJAX 加载 PHP 文件并传递数据和函数?
- react-native - SyntaxError: const 声明中缺少初始化程序
- xslt - XSL:大于和小于?
- excel - 复制具有标准 4、5、8、10、11、12、13 的过滤行
- sql - informatica 表达式转换中所需的 get_date oracle sql 函数替换
- ios - 无法使用 Alamofire 将参数传递给 url