首页 > 解决方案 > BigQuery 显示错误的结果 - 从 Cloud Function 复制数据?

问题描述

我是一名初级开发人员,负责将 Facebook API 实施到现有项目。但是,业务团队发现 BigQuery 上显示的 Google Analytics 结果是错误的。他们要求我修复它。这是架构:

在此处输入图像描述

我所做的是:

在此处输入图像描述

基于这些信息,我坚信问题出在云功能上,因为它是 GCS 和 BQ 之间的唯一元素。我查看了从 GCS 触发文件的云功能,但找不到任何重复的操作。

你知道我怎样才能找到问题吗?

云功能

BUCKET = "xxxx"
GOOGLE_PROJECT = "xxxx"
HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Last Non-Direct Click Conversions": "last_non_direct_click_conversions",
    "Last Non-Direct Click Conversion Value": "last_non_direct_click_conversion_value",
    "Last Click Prio Conversions": "last_click_prio_conversions",
    "Last Click Prio Conversion Value": "last_click_prio_conversion_value",
    "Data-Driven Conversions": "dda_conversions",
    "Data-Driven Conversion Value": "dda_conversion_value",
    "% Change in Conversions from Last Non-Direct Click to Last Click Prio": "last_click_prio_vs_last_click",
    "% Change in Conversions from Last Non-Direct Click to Data-Driven": "dda_vs_last_click"
}

SPEND_HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Spend": "spend"
}

tables_schema = {
    "google-analytics": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("goal", bigquery.enums.SqlTypeNames.STRING, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversions", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE')
    ],
    "google-analytics-spend": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("spend", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
    ]
}


def download_from_gcs(file):
    client = storage.Client()
    bucket = client.get_bucket(BUCKET)
    blob = bucket.get_blob(file['name'])
    file_name = os.path.basename(os.path.normpath(file['name']))
    blob.download_to_filename(f"/tmp/{file_name}")
    return file_name


def load_in_bigquery(file_object, dataset: str, table: str):
    client = bigquery.Client()
    table_id = f"{GOOGLE_PROJECT}.{dataset}.{table}"
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        schema=tables_schema[table]
    )

    job = client.load_table_from_file(file_object, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.


def __order_columns(df: pd.DataFrame, spend=False) ->pd.DataFrame:
    # We want to have source and medium columns at the third position
    # for a spend data frame and at the fourth postion for others df
    # because spend data frame don't have goal column.
    pos = 2 if spend else 3

    cols = df.columns.tolist()
    cols[pos:2] = cols[-2:]
    cols = cols[:-2]
    return df[cols]


def __common_transformation(df: pd.DataFrame, date: str, goal: str) -> pd.DataFrame:
    # for any kind of dataframe, we add date and week columns
    # based on the file name and we split Source/Medium from the csv
    # into two different columns

    week_of_the_year = datetime.strptime(date, '%Y-%m-%d').isocalendar()[1]
    df.insert(0, 'date', date)
    df.insert(1, 'week', week_of_the_year)
    mapping = SPEND_HEADER_MAPPING if goal == "spend" else HEADER_MAPPING
    print(df.columns.tolist())
    df = df.rename(columns=mapping)
    print(df.columns.tolist())
    print(df)
    df["source_medium"] = df["source_medium"].str.replace(' ', '')
    df[["source", "medium"]] = df["source_medium"].str.split('/', expand=True)
    df = df.drop(["source_medium"], axis=1)
    df["week"] = df["week"].astype(int, copy=False)
    return df


def __transform_spend(df: pd.DataFrame) -> pd.DataFrame:
    df["spend"] = df["spend"].astype(float, copy=False)
    df = __order_columns(df, spend=True)
    return df[df.columns[:6]]


def __transform_attribution(df: pd.DataFrame, goal: str) -> pd.DataFrame:
    df.insert(2, 'goal', goal)
    df["last_non_direct_click_conversions"] = df["last_non_direct_click_conversions"].astype(int, copy=False)
    df["last_click_prio_conversions"] = df["last_click_prio_conversions"].astype(int, copy=False)
    df["dda_conversions"] = df["dda_conversions"].astype(float, copy=False)
    return __order_columns(df)


def transform(df: pd.DataFrame, file_name) -> pd.DataFrame:
    goal, date, *_ = file_name.split('_')
    df = __common_transformation(df, date, goal)
    # we only add goal in attribution df (google-analytics table).
    return __transform_spend(df) if "spend" in file_name else __transform_attribution(df, goal)


def main(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    file = event

    file_name = download_from_gcs(file)
    df = pd.read_csv(f"/tmp/{file_name}")

    transformed_df = transform(df, file_name)

    with open(f"/tmp/bq_{file_name}", "w") as file_object:
        file_object.write(transformed_df.to_csv(index=False))

    with open(f"/tmp/bq_{file_name}", "rb") as file_object:
        table = "google-analytics-spend" if "spend" in file_name else "google-analytics"
        load_in_bigquery(file_object, dataset='attribution', table=table)

更新

是的,云函数由 GCS 对象 finalize 事件触发。此外,该功能不会在失败时自动重试。

我正在遵循您的建议,现在正在查看我的 Cloud Function 页面上的日志表。在最后 10 行日志数据中,似乎运行了 3 个不同的 Cloud Function 实例。当我扩展每一行时,我无法获得更多细节。

在此处输入图像描述

我现在还要检查 BigQuery 日志。我想最简单的解决方案是使用BigQueryAuditMetadata并获取有关表何时更新的日志?

标签: google-cloud-platformgoogle-bigquerygoogle-cloud-functionsgoogle-cloud-storage

解决方案


在我看来,这是一个非常大的话题,因此可能很难提供一个精确的解决方案来解决所有问题。所以,我无法解决这个问题,但我只能表达一些个人意见并提供一些建议。

云功能是由 GCS 对象完成事件触发的 - 请您检查一下这是否正确?在这种情况下,事件是在触发云函数调用之前“通过 PubSub”。现在有两件事要记住:

  • PubSub 基于“至少传递一次”范式,因此重复的消息传递是可能的。
  • 此类云函数调用具有自动确认功能。开发人员无法控制这一点。PubSub 不能用于控制整个进程状态。并且云函数执行的时间越长(最多超时 540 秒或更长时间),PubSub 做出(内部)消息未传递的决定的机会就越大,因此应该再次传递它,从而进行新的调用的云功能。

此处提供了一些其他详细信息:问题:发布订阅消息的 Cloud Function 显式确认

现在,如何查看是否会发生这种情况。就我个人而言,我将从日志记录开始。当云功能启动时,我会记录对象名称和一些哈希码(即 CRC32C 或 MD5 等,可从事件元数据中获得)——就在云功能代码中。在这种情况下,我将能够从日志中看到一个 GCS 对象的许多云函数调用(如果发生这种情况)。另一个好主意 - 获取信息 - 云功能正在执行多长时间。

通过执行该步骤,我们可以检查是否为给定的 GCS 对象多次调用云函数。

下一步——我们如何加载数据。“负载”是一项工作。这意味着存在一个队列和一个调度程序(GCP BigQuery 服务中的某处)。并且加载作业保留在队列中,直到它被拾取进行加载,然后该作业被执行/执行。所有这些都是非常“异步”的。请检查是否有失败的加载作业?在最简单的情况下,这可以通过 BigQuery UI 完成。

最重要的是,从云函数内存内部加载——从我的角度来看——不仅非常昂贵,而且非常危险且不可靠。甚至简单地将 csv 保存到 GCS 存储桶中并从存储桶加载 - 可能会好得多。

下一个 -据我记得,每张桌子每天有 1500 个负载作业配额。如果您有许多文件要加载 - 您可以轻松超过该配额。

“加载”数据的替代方式 - 使用流式传输。它没有这样的配额限制,但它是收费的,因此您需要为流媒体付费。

我现在就停下来,请让我知道以上内容是否有用。以及您将在哪个方向开发您的解决方案。

=> 更新2021 年 2 月 4 日 10:50 GMT

为避免复制和粘贴 - 请参阅此处的答案:Cloud Function running multiple times instead of once


推荐阅读