首页 > 解决方案 > 使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能

问题描述

我只是在寻找最近使用 UDF 将大量数据添加到 BigQuery 表中的方法。所以,我尝试了这样的推荐方法:

#standardSQL
INSERT INTO `project.dataset.Quincy` (id, col)
WITH array_to_loop_through AS (
  SELECT id 
  FROM UNNEST(GENERATE_ARRAY(1, 1000, 1)) id
)
SELECT id, CONCAT('Rank: ', CAST(id AS STRING))
FROM array_to_loop_through

将 100 万个值添加到表中需要 8 秒。所以我在我的UDF上应用了这种方式:

CREATE TEMPORARY FUNCTION myFunc()
  RETURNS array<string>
  LANGUAGE js AS
"""
a=[""];
for(i=0;i<=50;i++){
    a.push(randomString(12));
    }
    return a;
"""
OPTIONS (
library="gs://kaneki110299/tester.js"

);



#standardSQL

INSERT INTO `Lambert.fortune` (password)
WITH array_to_loop_through AS (
  SELECT * 
  FROM UNNEST(myFunc()) id
)
SELECT CONCAT(CAST(id AS STRING))
FROM array_to_loop_through

当我在 BigQuery 上运行此查询时,它会运行 5 分钟,然后遇到只有 50 个值的 UDF 超时。当我将循环放入时发生了同样的错误tester.js。所以,我尝试了另一种方法:

CREATE TEMPORARY FUNCTION myFunc()
  RETURNS string
  LANGUAGE js AS
"""   
    return randomString(12);
"""
OPTIONS (
library="gs://kaneki110299/tester.js"

);



#standardSQL

INSERT INTO `Lambert.fortune` (password) 
Values (myFunc()),(myFunc()),(myFunc())...//1000 times

与上一个查询不同,这个查询只需 30 秒即可将我的 UDF 结果中的 1000 个值添加到表中。看起来循环在 BigQuery 上运行不佳或运行不快。

在运行用户定义的函数以将大量数据插入其数据集时,是否可以使用并行或 BigQuery 支持任何方式来优化其 CPU 性能?我试图在表上添加 10 亿个值,所以我使用的最后一种方法似乎不实用。

标签: arraysfor-loopgoogle-bigquery

解决方案


使用googleApi npm您可以编写一个 JS 程序,该程序将并行运行多个插入。

这是关于如何使用 API 进行 1 次调用的完整工作 mocha 测试。您可以用自己的方法包装内部调用,for loop以并行完成插入。

if (!global._babelPolyfill) {
    var a = require("babel-polyfill")
}

import {google} from 'googleapis'

let bigQuery = google.bigquery("v2")

describe('Run query with API', async () => {

    it('Run a query', async () => {
        let result = await test('panada')

    })

    async function test(p1) {
        try {
            let query = `INSERT INTO \`project.dataset.Quincy\` (id, col)
                        WITH array_to_loop_through AS (
                          SELECT id 
                          FROM UNNEST(GENERATE_ARRAY(1, 10, 1)) id
                        )
                        SELECT id, CONCAT('Rank: ', CAST(id AS STRING))
                        FROM array_to_loop_through`

            let auth = getBasicAuthObj()
            auth.setCredentials({
                access_token: "myAccessToken",
                refresh_token: "myRefreshToken"
            })

            let request = {
                "projectId": "myProject",
                auth,
                "resource": {
                    "projectId": "myProject",
                    "configuration": {
                        "query": {
                            query,
                            "useLegacySql": false
                        },
                        "dryRun": false
                    }
                }
            }

            console.log(`query is: ${query}`)

            let result = await callBQ(request)

            // debugger
            console.log(`Status is: ${result.data.status.state}`)
        } catch (err) {
            console.log("err", err)
        }
    }

    /**
     * Call BigQuery jobs.insert
     * @param request
     * @returns {Promise<*>}
     */
    async function callBQ(request) {
        debugger
        // console.log("request", request)
        try {
            let result = await bigQuery.jobs.insert(request, request)
            console.log(`All good.....`)

            return result
        } catch (e) {
            console.log(`Failed to run query: ${e}`)
        }

    }

    /**
     * Create oAuth object
     * @returns {OAuth2Client}
     */
    function getBasicAuthObj() {
        let clientId = 'myclientId'
        let clientSecret = 'mySecret'
        let redirectUrl = 'URL'

        return new google.auth.OAuth2(
            clientId,
            clientSecret,
            redirectUrl
        )
    }
})

注意:Bigquery 在并行插入和运行查询时有限制,有关更多详细信息,请参阅此链接


推荐阅读