apache-beam - Apache Beam - 时间关系上的流式连接
问题描述
我想使用 Apache Beam Python SDK 构建一个事件驱动的交易系统进行回测。该系统中的许多PTransform
操作将属于“时间有效性窗口”/“时间连接”类型。
例如,以Beam 人的著作Streaming Systems中有关货币报价和交易的工作示例为例。类似的例子出现在较早的一篇论文中。
Right Table: quotes.txt (Currency Pair, Price, Event Time, Processing Time)
USD/JPY,102.0000,12:00:00,12:04:13
EUR/JPY,114.0000,12:00:30,12:06:23
EUR/JPY,119.0000,12:06:00,12:07:33
EUR/JPY,116.0000,12:03:00,12:09:07
Left Table: orders.txt (Currency Pair, Quantity, Event Time, Processing Time)
USD/JPY,1,12:03:00,12:03:44
EUR/JPY,2,12:02:00,12:05:07
EUR/JPY,5,12:05:00,12:08:00
EUR/JPY,3,12:08:00,12:09:33
USD/JPY,5,12:10:00,12:10:59
假设这两个示例都是无界集合的代理(例如,2 个带有键=货币对的 Kafka 主题)。我完全不知道如何使用 Apache Beam API 对这两个(可能是无界的)集合进行左连接以产生以下输出。
Output Table With Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,000.0000,12:02:00,False,12:05:07
EUR/JPY,000.0000,12:02:00,True,12:06:23
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,570.0000,12:05:00,False,12:08:00
EUR/JPY,570.0000,12:05:00,True,12:09:07
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
"Final" Output Table Without Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
如何PTransform
使用 Windows、触发器和实现上述功能CoGroupByKey
?
当前代码 - 只是一些带有占位符的样板
"""Testing Apache beam joins."""
import logging
import datetime
import decimal
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Schema and Transforms
class Quote(typing.NamedTuple):
base_currency: str
quote_currency: str
price: decimal.Decimal
time_ms: int
class ConvertQuote(beam.DoFn):
def process(self, element):
pair, price_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
price = decimal.Decimal(price_str)
time_ms = int(self._time_ms(time_str))
yield Quote(base_currency, quote_currency, price, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddQuoteTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
class Order(typing.NamedTuple):
base_currency: str
quote_currency: str
quantity: int
time_ms: int
class ConvertOrder(beam.DoFn):
def process(self, element):
pair, quantity_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
quantity = int(quantity_str)
time_ms = int(self._time_ms(time_str))
yield Order(base_currency, quote_currency, quantity, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddOrderTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
PAIRS = ["EUR/JPY", "USD/JPY"] # Maybe pass this in as an option?
def by_pair(item, num_pairs):
return PAIRS.index(f"{item.base_currency}/{item.quote_currency}")
# Administrative
LOGGING_MSG_FMT = "%(asctime)s - %(levelname)s: %(message)s"
LOGGING_DATE_FMT = "%Y-%m-%d %H:%M:%S%z"
logging.basicConfig(format=LOGGING_MSG_FMT, datefmt=LOGGING_DATE_FMT, level=logging.INFO)
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--quotes-file", dest="quotes_file", default="quotes.txt")
parser.add_argument("--orders-file", dest="orders_file", default="orders.txt")
parser.add_argument("--trades-file", dest="trades_file", default="trades")
options = PipelineOptions()
my_options = options.view_as(MyOptions)
# Main
with beam.Pipeline(options=my_options) as p:
eurjpy_quotes, usdjpy_quotes = (
p
| "ReadQuotes" >> beam.io.ReadFromText(my_options.quotes_file)
| "ConvertQuotes" >> beam.ParDo(ConvertQuote())
| "AddQuoteTimestamps" >> beam.ParDo(AddQuoteTimestamp())
| "PartitionQuotes" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
eurjpy_orders, usdjpy_orders = (
p
| "ReadOrders" >> beam.io.ReadFromText(my_options.orders_file)
| "ConvertOrders" >> beam.ParDo(ConvertOrder())
| "AddOrderTimestamps" >> beam.ParDo(AddOrderTimestamp())
| "PartitionOrders" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
# Something here using CoGroupByKey on eurjpy_quotes and eurjpy_orders
# This is just a placeholder for now.
eurjpy_quotes | "WriteEURJPYTrades" >> beam.io.WriteToText(my_options.trades_file)
解决方案
对于时间序列,最好使用 State 和 Timers API。
java中也有一些当前的WIP关于temporal joins temporal example
推荐阅读
- android - 片段回调
- c++ - Boost Build runtime-link 不兼容的属性
- javascript - jQuery输入键不调用函数
- php - 在 MySQL 中选择上一条和下一条记录
- jquery - 字符串内的 CSS 选择器检查属性
- lotus-domino - LotusScript C 标注在 AIX 64 位上的 Domino 11 中损坏
- git - Git责备删除的部分
- r - 如何在 Sweave 或 Knitr 中指定绘图大小和绘图代码
- oracle - INS-20802 Microsoft Transaction Server 的 Oracle 服务失败(主机名太长)
- windows-10 - 插入后如何立即将集线器中的USB插槽插入?