首页 > 解决方案 > 如何在云函数中读取 csv 文件的创建和更新元数据并将其作为列添加到 csv

问题描述

我每天从 Google Compute Engine 接收 CSV 文件到我的存储桶中,我编写了一个云函数,将这些 CSV 的数据加载到 BigQuery 表中,并且运行良好。但是,在将数据发送到 BigQuery 表之前或发送数据时,我需要将 CSV 文件元数据中的文件创建时间和文件更新时间作为列包含在内。

这在 Cloud Function 中是否可行,我该怎么做?如果有某种示例可以指导我,我将不胜感激。

# my code in cloud functions
import os

from google.cloud import bigquery

GCP_PROJECT = os.environ.get('GCP_PROJECT')


def FlexToBigQuery(data, context):
    bucketname = data['bucket']
    filename = data['name']
    timeCreated = data['timeCreated']

    client = bigquery.Client()
    dataset_id = 'My-dataset'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()

    job_config.skip_leading_rows = 1
    job_config.field_delimiter = ';',
    job_config.allow_jagged_rows = True
    job_config.allow_quoted_newlines = True
    job_config.write_disposition = 'WRITE_TRUNCATE',
    job_config.source_format = bigquery.SourceFormat.CSV

    job_config.schema = [
        bigquery.SchemaField('Anstallningsnummer', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Kod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Kostnadsstalle', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Tidkod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('OB_tidkod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Dagsschema', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Schemalagd_arbetstid', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Summa_narvaro', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Summa_franvaro', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum_for_klarmarkering', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum_for_attestering', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Frislappsdatum', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Export_klockslag', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Vecka', 'STRING', mode='NULLABLE')
    ]

    uri = 'gs://%s/%s' % (bucketname, filename)
    print('Received file "%s" at %s.' % (
        uri,
        timeCreated
    ))

    # get the URI for uploaded CSV in GCS from 'data'
    uri = 'gs://' + data['bucket'] + '/' + data['name']

    # lets do this
    load_job = client.load_table_from_uri(
        uri,
        dataset_ref.table('employee_time'),
        job_config=job_config)

    print('Starting job with ID {}'.format(load_job.job_id))
    print('File: {}'.format(data['name']))

    load_job.result()  # wait for table load to complete.
    print('Job finished.')

    destination_table = client.get_table(dataset_ref.table('employee_time'))
    print('Loaded {} rows.'.format(destination_table.num_rows))

标签: python-3.xgoogle-cloud-platformgoogle-cloud-functionsgoogle-cloud-storagemetadata

解决方案


存在这两个函数os.path.getmtimeos.path.getctime用于从文件中获取创建和更新时间,您可以在将数据发送到 BigQuery 之前使用该文件。

import os.path, time

print("updated: %s" % time.ctime(os.path.getmtime(file)))
print("created: %s" % time.ctime(os.path.getctime(file)))

可能类似于如何在 Python 中获取文件创建和修改日期/时间?


推荐阅读