首页 > 解决方案 > 如何从 BigQuery 职位列表中获取 DELETE 查询?

问题描述

我有一个摄取时间分区表,它没有任何日期范围的行

#standardSQL
SELECT COUNT(*) AS rows_cnt,
       _PARTITIONTIME AS ptime
FROM my_dataset.my_table
WHERE _PARTITIONTIME BETWEEN '2017-01-01' AND '2017-01-30'
GROUP BY ptime
-- result returned zero rows :o

我想调查一下:这些分区中的数据很可能被删除查询清除了。我写了一个脚本来检索“可疑”查询。那些有某种模式的

DELETE ...my_table .... 2017-01 ...

或者干脆删除工作

这是脚本

import re
import requests
import json
from pprint import pprint
from datetime import datetime
import subprocess
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('-all_users', type=bool, default=True)
parser.add_argument('-projection', type=str, default='full')
parser.add_argument('-state_filter', type=str, default='done')
parser.add_argument('-access_token', type=str)
args = parser.parse_args()


def get_query_string(job):
    return job['configuration']['query']['query']

def log_job(job):
    with open('suspicious_jobs.txt', 'a+') as f:
        f.writelines(json.dumps(job))
        f.writelines('\n')

def is_suspicious_query(query_string):
    lower_query = query_string.lower()
    return re.search('delete.*my_table.*2017-01', lower_query) is not None

def is_delete_statement(job):
    return job['statistics']['query']['statementType'] == 'DELETE'

def is_copy_job(job):
    return job['configuration']['jobType'] == 'COPY'

def get_next_token():
    subprocess.check_call('gcloud auth login', shell=True)
    return subprocess.check_output('gcloud auth print-access-token', shell=True).decode('utf-8').strip()


all_users = args.all_users
projection = args.projection
state_filter = args.state_filter
query_url = """https://www.googleapis.com/bigquery/v2/projects/my_project_id/jobs"""
access_token = get_next_token()

next_page_token = ''
page = 1
while next_page_token is not None:
    print('######## querying page ', page)
    url_parameters = {
    'allUsers': all_users,
    'pageToken': next_page_token,
    'projection': projection,
    'stateFilter': state_filter
    }
    headers = {
    'Authorization': 'Bearer {}'.format(access_token)
    }
    r = requests.get(query_url, params=url_parameters, headers=headers)

    if r.status_code == 401:
        access_token = get_next_token()
        print(access_token)
        print(r.text)
        continue
    elif r.status_code != 200:
        print(r.text)
        print('###### last_page_token is ', next_page_token)
        break

    next_page_token = r.json().get('nextPageToken', None)
    jobs = r.json().get('jobs', [])
    for j in jobs:
        try:
            if is_copy_job(j):
                continue
            q = get_query_string(j)
            if is_suspicious_query(q) or is_delete_statement(j):
                log_job(j)

        except KeyError as e:
            pass

    page = page + 1

is_suspicious_query函数检查查询是否与模式匹配delete.*my_table.*2017-01(不区分大小写)。

我找不到删除的工作。我的 while 循环中是否缺少工作?(每当我遇到 KeyError 异常时,我都会跳过)

是否可以在不记录操作的情况下删除表的分区?

标签: google-bigquery

解决方案


为此,我建议使用带有以下过滤器的Stackdriver 高级过滤器:

resource.type="bigquery_resource"
protoPayload.serviceData.jobQueryRequest.query:"your_dataset.your_table"
protoPayload.serviceData.jobQueryResponse.job.jobConfiguration.query.statementType="DELETE"

这样,您将获得对指定表进行的 DELETE 操作。

更新:

resource.type="bigquery_resource"
(protoPayload.methodName="jobservice.jobcompleted"
OR protoPayload.authorizationInfo.resource:"projects/<your-project>/datasets/<your-dataset>/tables/"
OR (protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.destinationTable.datasetId="<your-dataset>"
AND protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.destinationTable.tableId:"<your-table>"))
protoPayload.methodName!="tabledataservice.list"
"<yourdataset.yourtable>"
severity!="ERROR"

使用此过滤器,您可以找到对表进行的大多数操作,例如,具有以 WRITE_TRUNCATE 作为值的 WriteDisposition 的查询截断您的表并从头开始写入。请注意,如果您使用:而不是=同时使用过滤器,则它可以搜索部分字符串。

关于您的分区问题,据我所知,当一个分区被删除时,它应该在日志中列为已删除。此外,您可以使用命令行工具 bq rm表装饰器删除分区。

希望这对您有所帮助。


推荐阅读