python-3.x - 如何在云函数中读取 csv 文件的创建和更新元数据并将其作为列添加到 csv
问题描述
我每天从 Google Compute Engine 接收 CSV 文件到我的存储桶中,我编写了一个云函数,将这些 CSV 的数据加载到 BigQuery 表中,并且运行良好。但是,在将数据发送到 BigQuery 表之前或发送数据时,我需要将 CSV 文件元数据中的文件创建时间和文件更新时间作为列包含在内。
这在 Cloud Function 中是否可行,我该怎么做?如果有某种示例可以指导我,我将不胜感激。
# my code in cloud functions
import os
from google.cloud import bigquery
GCP_PROJECT = os.environ.get('GCP_PROJECT')
def FlexToBigQuery(data, context):
bucketname = data['bucket']
filename = data['name']
timeCreated = data['timeCreated']
client = bigquery.Client()
dataset_id = 'My-dataset'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_config.field_delimiter = ';',
job_config.allow_jagged_rows = True
job_config.allow_quoted_newlines = True
job_config.write_disposition = 'WRITE_TRUNCATE',
job_config.source_format = bigquery.SourceFormat.CSV
job_config.schema = [
bigquery.SchemaField('Anstallningsnummer', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Kod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Kostnadsstalle', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Tidkod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('OB_tidkod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Dagsschema', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Schemalagd_arbetstid', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Summa_narvaro', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Summa_franvaro', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum_for_klarmarkering', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum_for_attestering', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Frislappsdatum', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Export_klockslag', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Vecka', 'STRING', mode='NULLABLE')
]
uri = 'gs://%s/%s' % (bucketname, filename)
print('Received file "%s" at %s.' % (
uri,
timeCreated
))
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + data['bucket'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table('employee_time'),
job_config=job_config)
print('Starting job with ID {}'.format(load_job.job_id))
print('File: {}'.format(data['name']))
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table('employee_time'))
print('Loaded {} rows.'.format(destination_table.num_rows))
解决方案
存在这两个函数os.path.getmtime
,os.path.getctime
用于从文件中获取创建和更新时间,您可以在将数据发送到 BigQuery 之前使用该文件。
import os.path, time
print("updated: %s" % time.ctime(os.path.getmtime(file)))
print("created: %s" % time.ctime(os.path.getctime(file)))
推荐阅读
- bash - 用 `jQuery` 用 bash 替换所有 $ 符号
- python - 在外壳中显示一个条形以直观地指示不断变化的计数
- javascript - 我如何删除不是我的消息,而是将机器人发送给我的消息?
- scala - 将 Scala Seq 转换为 Spark DStream?
- powerquery - Expression.Error:我们无法将值 3 转换为函数类型
- react-native - React Native Reanimated 2 为 SVG 路径的长度设置动画
- python - Pyinstaller 不包括 ffmpeg.exe 和 ffprobe.exe
- r - 在 Stata 和 R 之间具有时变系数的 coxph 的不同结果
- html - 网格容器的意外滚动
- azure-devops - 如何按标签显示错误?