首页 > 解决方案 > Apache Beam - 时间关系上的流式连接

问题描述

我想使用 Apache Beam Python SDK 构建一个事件驱动的交易系统进行回测。该系统中的许多PTransform操作将属于“时间有效性窗口”/“时间连接”类型。

例如,以Beam 人的著作Streaming Systems中有关货币报价和交易的工作示例为例。类似的例子出现在较早的一篇论文中。

Right Table: quotes.txt (Currency Pair, Price, Event Time, Processing Time)
USD/JPY,102.0000,12:00:00,12:04:13
EUR/JPY,114.0000,12:00:30,12:06:23
EUR/JPY,119.0000,12:06:00,12:07:33
EUR/JPY,116.0000,12:03:00,12:09:07

Left Table: orders.txt (Currency Pair, Quantity, Event Time, Processing Time)
USD/JPY,1,12:03:00,12:03:44
EUR/JPY,2,12:02:00,12:05:07
EUR/JPY,5,12:05:00,12:08:00
EUR/JPY,3,12:08:00,12:09:33
USD/JPY,5,12:10:00,12:10:59

假设这两个示例都是无界集合的代理(例如,2 个带有键=货币对的 Kafka 主题)。我完全不知道如何使用 Apache Beam API 对这两个(可能是无界的)集合进行左连接以产生以下输出。

Output Table With Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,000.0000,12:02:00,False,12:05:07
EUR/JPY,000.0000,12:02:00,True,12:06:23
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,570.0000,12:05:00,False,12:08:00
EUR/JPY,570.0000,12:05:00,True,12:09:07
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59


"Final" Output Table Without Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59

如何PTransform使用 Windows、触发器和实现上述功能CoGroupByKey

当前代码 - 只是一些带有占位符的样板

"""Testing Apache beam joins."""

import logging
import datetime
import decimal
import typing

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


# Schema and Transforms

class Quote(typing.NamedTuple):
    base_currency: str
    quote_currency: str
    price: decimal.Decimal
    time_ms: int


class ConvertQuote(beam.DoFn):

    def process(self, element):
        pair, price_str, time_str, _ = element.rstrip().split(",")
        base_currency, quote_currency = pair.split("/") 
        price = decimal.Decimal(price_str)
        time_ms = int(self._time_ms(time_str))
        yield Quote(base_currency, quote_currency, price, time_ms)

    def _time_ms(self, time):
        epoch = datetime.datetime.utcfromtimestamp(0)
        dt = datetime.datetime.strptime(time, "%H:%M:%S")
        return (dt - epoch).total_seconds() * 1000


class AddQuoteTimestamp(beam.DoFn):

    def process(self, element):
        yield beam.window.TimestampedValue(element, element.time_ms)


class Order(typing.NamedTuple):
    base_currency: str
    quote_currency: str
    quantity: int
    time_ms: int


class ConvertOrder(beam.DoFn):

    def process(self, element):
        pair, quantity_str, time_str, _ = element.rstrip().split(",")
        base_currency, quote_currency = pair.split("/") 
        quantity = int(quantity_str)
        time_ms = int(self._time_ms(time_str))
        yield Order(base_currency, quote_currency, quantity, time_ms)

    def _time_ms(self, time):
        epoch = datetime.datetime.utcfromtimestamp(0)
        dt = datetime.datetime.strptime(time, "%H:%M:%S")
        return (dt - epoch).total_seconds() * 1000


class AddOrderTimestamp(beam.DoFn):

    def process(self, element):
        yield beam.window.TimestampedValue(element, element.time_ms)


PAIRS = ["EUR/JPY", "USD/JPY"]  # Maybe pass this in as an option?

def by_pair(item, num_pairs):
    return PAIRS.index(f"{item.base_currency}/{item.quote_currency}")


# Administrative

LOGGING_MSG_FMT = "%(asctime)s - %(levelname)s: %(message)s"
LOGGING_DATE_FMT = "%Y-%m-%d %H:%M:%S%z"
logging.basicConfig(format=LOGGING_MSG_FMT, datefmt=LOGGING_DATE_FMT, level=logging.INFO)    


class MyOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument("--quotes-file", dest="quotes_file", default="quotes.txt")
        parser.add_argument("--orders-file", dest="orders_file", default="orders.txt")
        parser.add_argument("--trades-file", dest="trades_file", default="trades")

options = PipelineOptions()
my_options = options.view_as(MyOptions)


# Main

with beam.Pipeline(options=my_options) as p:

    eurjpy_quotes, usdjpy_quotes = (
        p
        | "ReadQuotes" >> beam.io.ReadFromText(my_options.quotes_file)
        | "ConvertQuotes" >> beam.ParDo(ConvertQuote())
        | "AddQuoteTimestamps" >> beam.ParDo(AddQuoteTimestamp())
        | "PartitionQuotes" >> beam.Partition(by_pair, len(PAIRS))
        # Some kind of windowing/triggering?
        )

    eurjpy_orders, usdjpy_orders = (
        p
        | "ReadOrders" >> beam.io.ReadFromText(my_options.orders_file)
        | "ConvertOrders" >> beam.ParDo(ConvertOrder())
        | "AddOrderTimestamps" >> beam.ParDo(AddOrderTimestamp())
        | "PartitionOrders" >> beam.Partition(by_pair, len(PAIRS))
        # Some kind of windowing/triggering?
        )

    # Something here using CoGroupByKey on eurjpy_quotes and eurjpy_orders
    # This is just a placeholder for now.
    eurjpy_quotes | "WriteEURJPYTrades" >> beam.io.WriteToText(my_options.trades_file)

标签: apache-beam

解决方案


对于时间序列,最好使用 State 和 Timers API。

关于状态和计时器的原创博客

状态和计时器文档

java中也有一些当前的WIP关于temporal joins temporal example


推荐阅读