首页 > 解决方案 > 如何在 F# 中实现基于时间的长度队列?

问题描述

这是问题的后续:如何在 F# 中优化此移动平均计算

总结一下原来的问题:我需要对我收集的一组数据做一个移动平均;每个数据点都有一个时间戳,我需要处理直到某个时间戳的数据。这意味着我有一个可变大小的列表来平均。

最初的问题将实现作为一个队列,其中元素被添加并最终随着它们变得太旧而被删除。但是,最后,遍历队列以获取平均值很慢。

最初大部分 CPU 时间都花在寻找数据进行平均,但是一旦通过只保留需要的数据来解决这个问题,Seq.average 调用被证明非常慢。

看起来原来的机制(基于 Queue<>)是不合适的,这个问题是关于寻找一个新的。

我可以想到两个解决方案:

是否存在与 Queue<> 类似的循环缓冲区的实现?

到目前为止,最快的代码是:

module PriceMovingAverage =

    // moving average queue
    let private timeQueue  = Queue<DateTime>()
    let private priceQueue = Queue<float>()

    // update moving average
    let updateMovingAverage (tradeData: TradeData) priceBasePeriod =

        // add the new price
        timeQueue.Enqueue(tradeData.Timestamp)
        priceQueue.Enqueue(float tradeData.Price)

        // remove the items older than the price base period
        let removeOlderThan = tradeData.Timestamp - priceBasePeriod
        let rec dequeueLoop () =
            let p = timeQueue.Peek()
            if p < removeOlderThan then
                timeQueue.Dequeue() |> ignore
                priceQueue.Dequeue() |> ignore
                dequeueLoop()

        dequeueLoop()


    // get the moving average
    let getPrice () =
        try
            Some (
                priceQueue
                |> Seq.average   <- all CPU time goes here
                |> decimal
            )
        with _ ->
            None

标签: optimizationf#

解决方案


基于 10-15k 的队列长度,我认为肯定有考虑将交易批量处理成大约 100 笔交易的预先计算块。

添加几种类型:

type TradeBlock = {
    data: TradeData array
    startTime: DateTime
    endTime: DateTime
    sum: float
    count:int

}

type AvgTradeData = 
| Trade of TradeData
| Block of TradeBlock

然后我会让移动平均线使用DList<AvgTradeData>. (https://fsprojects.github.io/FSharpx.Collections/reference/fsharpx-collections-dlist-1.html)如果 startTime 在价格周期之后,则手动对 DList 中的第一个元素求和,并从列表中删除价格周期超过结束时间。列表中的最后一个元素被保留,Trade tradeData直到 100 被追加,然后全部从尾部删除并变成一个TradeBlock.


推荐阅读