arrays - 使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能
问题描述
我只是在寻找最近使用 UDF 将大量数据添加到 BigQuery 表中的方法。所以,我尝试了这样的推荐方法:
#standardSQL
INSERT INTO `project.dataset.Quincy` (id, col)
WITH array_to_loop_through AS (
SELECT id
FROM UNNEST(GENERATE_ARRAY(1, 1000, 1)) id
)
SELECT id, CONCAT('Rank: ', CAST(id AS STRING))
FROM array_to_loop_through
将 100 万个值添加到表中需要 8 秒。所以我在我的UDF上应用了这种方式:
CREATE TEMPORARY FUNCTION myFunc()
RETURNS array<string>
LANGUAGE js AS
"""
a=[""];
for(i=0;i<=50;i++){
a.push(randomString(12));
}
return a;
"""
OPTIONS (
library="gs://kaneki110299/tester.js"
);
#standardSQL
INSERT INTO `Lambert.fortune` (password)
WITH array_to_loop_through AS (
SELECT *
FROM UNNEST(myFunc()) id
)
SELECT CONCAT(CAST(id AS STRING))
FROM array_to_loop_through
当我在 BigQuery 上运行此查询时,它会运行 5 分钟,然后遇到只有 50 个值的 UDF 超时。当我将循环放入时发生了同样的错误tester.js
。所以,我尝试了另一种方法:
CREATE TEMPORARY FUNCTION myFunc()
RETURNS string
LANGUAGE js AS
"""
return randomString(12);
"""
OPTIONS (
library="gs://kaneki110299/tester.js"
);
#standardSQL
INSERT INTO `Lambert.fortune` (password)
Values (myFunc()),(myFunc()),(myFunc())...//1000 times
与上一个查询不同,这个查询只需 30 秒即可将我的 UDF 结果中的 1000 个值添加到表中。看起来循环在 BigQuery 上运行不佳或运行不快。
在运行用户定义的函数以将大量数据插入其数据集时,是否可以使用并行或 BigQuery 支持任何方式来优化其 CPU 性能?我试图在表上添加 10 亿个值,所以我使用的最后一种方法似乎不实用。
解决方案
使用googleApi npm您可以编写一个 JS 程序,该程序将并行运行多个插入。
这是关于如何使用 API 进行 1 次调用的完整工作 mocha 测试。您可以用自己的方法包装内部调用,for loop
以并行完成插入。
if (!global._babelPolyfill) {
var a = require("babel-polyfill")
}
import {google} from 'googleapis'
let bigQuery = google.bigquery("v2")
describe('Run query with API', async () => {
it('Run a query', async () => {
let result = await test('panada')
})
async function test(p1) {
try {
let query = `INSERT INTO \`project.dataset.Quincy\` (id, col)
WITH array_to_loop_through AS (
SELECT id
FROM UNNEST(GENERATE_ARRAY(1, 10, 1)) id
)
SELECT id, CONCAT('Rank: ', CAST(id AS STRING))
FROM array_to_loop_through`
let auth = getBasicAuthObj()
auth.setCredentials({
access_token: "myAccessToken",
refresh_token: "myRefreshToken"
})
let request = {
"projectId": "myProject",
auth,
"resource": {
"projectId": "myProject",
"configuration": {
"query": {
query,
"useLegacySql": false
},
"dryRun": false
}
}
}
console.log(`query is: ${query}`)
let result = await callBQ(request)
// debugger
console.log(`Status is: ${result.data.status.state}`)
} catch (err) {
console.log("err", err)
}
}
/**
* Call BigQuery jobs.insert
* @param request
* @returns {Promise<*>}
*/
async function callBQ(request) {
debugger
// console.log("request", request)
try {
let result = await bigQuery.jobs.insert(request, request)
console.log(`All good.....`)
return result
} catch (e) {
console.log(`Failed to run query: ${e}`)
}
}
/**
* Create oAuth object
* @returns {OAuth2Client}
*/
function getBasicAuthObj() {
let clientId = 'myclientId'
let clientSecret = 'mySecret'
let redirectUrl = 'URL'
return new google.auth.OAuth2(
clientId,
clientSecret,
redirectUrl
)
}
})
注意:Bigquery 在并行插入和运行查询时有限制,有关更多详细信息,请参阅此链接