首页 > 解决方案 > BigQuery ASOF 加入用例

问题描述

是否可以在 BigQuery 中进行 asof 加入?我认为它只支持平等加入,但试图理解解决方法

报价表

time                    sym bid    ask

2020-01-9T14:30:00.023 XYZ 16.22 16.25

2020-01-9T14:30:00.023 XYZ 16.21 16.27

2020-01-9T14:30:00.030 XYZ 16.20 16.28

2020-01-9T14:30:00.041 XYZ 162.22 16.26

2020-01-9T14:30:00.048 XYZ 162.23 16.28

交易表

time                    sym price  quantity

2020-01-9T14:30:00.023 MMM 16.23 75

2020-01-9T14:30:00.041 MMM 16.24 50

2020-01-9T14:30:00.041 MMM 16.25 100

在时间序列数据库中执行此操作的典型 SQL 看起来像这样,但想知道是否可以在 BigQuery 中计算这样的结果 SELECT timestamp, trades.sym, price, quantity, ask, bid, (ask - bid) AS spread FROM trades LEFT ASOF JOIN quotes

预期结果

$timestamp                    sym  price  quantity ask    bid    spread

2020-01-9T14:30:00.023000000Z MMM 16.23 75       16.25 16.22 0.03

2020-01-9T14:30:00.041000000Z MMM 16.24 50       16.26 16.22 0.04

2020-01-9T14:30:00.041000000Z MMM 16.25 100      16.26 16.22 0.04

标签: google-bigquery

解决方案


这里至少有两种方法,一种使用纯标准 SQL 不可扩展,另一种可扩展解决方案利用通过 BigQuery 的 JavaScript UDF 功能创建的辅助函数。

不可扩展的解决方案

我在 BigQuery 使用与您的组织非常相似的刻度数据时遇到了完全相同的问题。您可以在 BigQuery 中进行不等式联接,但这对于包含数百个或更多名称的美国股票数据甚至几个月都无法扩展。换句话说,您可能认为您可以尝试:

-- Example level 1 quotes table.
WITH l1_quotes AS (
    SELECT TIMESTAMP('2020-01-09T14:30:00.023') as time, 'XYZ' as sym, 16.22 as bid, 16.25 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.023') as time, 'XYZ' as sym, 16.21 as bid, 16.27 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.030') as time, 'XYZ' as sym, 16.20 as bid, 16.28 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.041') as time, 'XYZ' as sym, 16.22 as bid, 16.26 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.048') as time, 'XYZ' as sym, 16.23 as bid, 16.28 as ask
),
-- Example trades table.
trades AS (
    SELECT TIMESTAMP('2020-01-9T14:30:00.023') as time, 'MMM' as sym, 16.23 as price, 75 as quantity
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.041') as time, 'MMM' as sym, 16.24 as price, 50 as quantity
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.041') as time, 'MMM' as sym, 16.25 as price, 100 as quantity
),
-- Get all level 1 quotes <= each trade for each day using an inequality join.
inequality_join as (
    SELECT
        t.time as time,
        t.sym,
        t.price,
        t.quantity,
        l.time as l1_time,
        l.sym as l1_sym,
        l.bid,
        l.ask,
        l.ask - l.bid as spread
    FROM l1_quotes as l
    JOIN trades as t
    ON EXTRACT(DATE FROM l.time) = EXTRACT(DATE FROM t.time)
    AND l.time <= t.time
)
-- Use DISTINCT and LAST_VALUE to get the latest l1 quote asof each trade.
SELECT DISTINCT
    time, sym, price, quantity,
    LAST_VALUE(bid) OVER (latest_asof_time) as bid,
    LAST_VALUE(ask) OVER (latest_asof_time) as ask,
    LAST_VALUE(spread) OVER (latest_asof_time) as spread
FROM inequality_join
WINDOW latest_asof_time AS (
    PARTITION BY time, sym, l1_sym
    ORDER BY l1_time
    ROWS between unbounded preceding and unbounded following
);

这将解决您的示例问题,但在实践中不会扩展。请注意上面的连接取决于一个相等连接条件 by date 和一个不等式连接条件 by l1_quotes.time <= trades.time。不等式部分最终将生成所有l1_quotes 条目,其时间戳等于或早于每个条目的时间戳贸易进入。虽然这不是一个完整的 CROSS JOIN,虽然由于相等连接块它按日期分区,但它在实践中变成了一个非常大的连接。对于我正在使用的美国股票数据库,BigQuery 无法使用这种技术进行 ASOF JOIN,即使在运行多个小时后也是如此。您可以尝试缩短相等条件的时间窗口,例如提取一小时、一分钟等,但是您使该窗口越小,您就越有可能无法获得某些条目的先前级别 1 报价,尤其是对于非流动性符号。

可扩展的解决方案

那么,您最终如何在 BigQuery 中以可扩展的方式解决这个问题?嗯,BigQuery 中的一种惯用方法是在标准 SQL 无法满足您的需求时利用一个或多个帮助程序 JavaScript UDF 。

这是我为解决此问题而编写的 JavaScript UDF。它将生成代表 X ASOF Y 的 X,Y 时间戳元组数组。

CREATE TEMPORARY FUNCTION asof_join(x ARRAY<TIMESTAMP>, y ARRAY<TIMESTAMP>) RETURNS ARRAY<STRUCT<x TIMESTAMP, y TIMESTAMP>> LANGUAGE js AS """
    function js_timestamp_sort(a, b) { return a - b }
    x.sort(js_timestamp_sort);
    y.sort(js_timestamp_sort);
    var epoch = new Date(1970, 0, 1);
    var results = [];
    var j = 0;
    for (var i = 0; i < y.length; i++) {
        var curr_res = {x: epoch, y: epoch};
        for (; j < x.length; j++) {
            if (x[j] <= y[i]) {
                curr_res.x = x[j];
                curr_res.y = y[i];
            } else {
                break;
            }
        }
        if (curr_res.x !== epoch) {
            results.push(curr_res);
            j--;
        }
    }

    return results;
""";

想法是将 l1_quotes 时间戳 (X) 和交易时间戳 (Y) 提取到两个单独的数组中。然后我们使用上面的函数来生成 X,Y ASOF 元组的数组。然后我们将 UNNEST 这个元组数组将它们转换回 SQL 表,最后我们用 l1_quotes 连接它们,然后用交易连接它们。这是实现此目的的查询。在您的查询编辑器中,您需要粘贴上面的 JavaScript UDF 和下面的查询:

-- Example level 1 quotes table.
WITH l1_quotes AS (
    SELECT TIMESTAMP('2020-01-09T14:30:00.023') as time, 'XYZ' as sym, 16.22 as bid, 16.25 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.023') as time, 'XYZ' as sym, 16.21 as bid, 16.27 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.030') as time, 'XYZ' as sym, 16.20 as bid, 16.28 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.041') as time, 'XYZ' as sym, 16.22 as bid, 16.26 as ask
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.048') as time, 'XYZ' as sym, 16.23 as bid, 16.28 as ask
),
-- Example trades table.
trades AS (
    SELECT TIMESTAMP('2020-01-9T14:30:00.023') as time, 'MMM' as sym, 16.23 as price, 75 as quantity
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.041') as time, 'MMM' as sym, 16.24 as price, 50 as quantity
    UNION ALL SELECT TIMESTAMP('2020-01-9T14:30:00.041') as time, 'MMM' as sym, 16.25 as price, 100 as quantity
),
-- Extract distinct l1 quote times (use DISTINCT to reduce the size of the array for each day since we don't need duplicates for the UDF script to work).
distinct_l1_times as (
    SELECT DISTINCT time
    FROM l1_quotes
),
arrayed_l1_times AS (
    SELECT
        ARRAY_AGG(time) as times,
        EXTRACT(DATE FROM time) as curr_day
    FROM trades
    GROUP BY curr_day
),
-- Do the same for trade times.
distinct_trade_times AS (
    SELECT DISTINCT time
    FROM trades
),
arrayed_trade_times AS (
    SELECT
        ARRAY_AGG(time) as times,
        EXTRACT(DATE FROM time) as curr_day
    FROM trades
    GROUP BY curr_day
),
-- Use the handy asof_join JavaScript UDF created above.
asof_l1_trade_time_tuples AS (
    SELECT
        asof_join(arrayed_l1_times.times, arrayed_trade_times.times) as asof_tuples,
        arrayed_l1_times.curr_day as curr_day
    FROM arrayed_l1_times
    JOIN arrayed_trade_times
    USING (curr_day)
),
-- UNNEST the array of l1 quote/trade time asof tuples. Date grouping is no longer needed and dropped here.
unnested_times AS (
    SELECT
        a.x as l1_time,
        a.y as trade_time
    FROM
        asof_l1_trade_time_tuples , UNNEST(asof_l1_trade_time_tuples.asof_tuples) as a
)
-- Join back the l1 quote and trade information for the final result.  As before, use DISTINCT and LAST_VALUE to
-- eliminate duplicates that arise from repeated timestamps.
SELECT DISTINCT
    u.trade_time as time,
    t.price as price,
    t.quantity as quantity,
    LAST_VALUE(l.bid) OVER (latest_asof_time) as bid,
    LAST_VALUE(l.ask) OVER (latest_asof_time) as ask,
    LAST_VALUE(l.ask - l.bid) OVER (latest_asof_time) as spread,
FROM unnested_times as u
JOIN l1_quotes as l
ON u.l1_time = l.time
JOIN trades as t
ON u.trade_time = t.time
WINDOW latest_asof_time AS (
    PARTITION BY t.time, t.sym, l.sym
    ORDER BY l.time
    ROWS between unbounded preceding and unbounded following
)

上面使用 JavaScript UDF 的查询在我的数据库上运行了几分钟,而不是使用不可扩展的方法在数小时后从未完成。您甚至可以将 asof_join JavaScript UDF 作为永久 UDF 存储在数据集中,以便在其他上下文中使用。

我同意 Google 应该考虑实施 ASOF JOIN,特别是因为使用像 BigQuery 这样的分布式列式 OLAP 的时间序列分析变得越来越普遍。虽然使用此处描述的方法在 BigQuery 中是可以实现的,但使用单个 SELECT 语句和内置的 ASOF JOIN 构造会简单得多。


推荐阅读