首页 > 解决方案 > 导管:怎么样?具有背压的多个生产者到一个消费者

问题描述

无论如何我可以表达“背压”来限制来自管道源的值的产生吗?

假设我有类似的东西:

source :: ConduitT () Tweet Twitter ()
source = do
  ts <- lift $ getNewTweets [("screen_name", "Bits90824664")]
  yieldMany ts
  _ <- liftIO $ threadDelay 3000000
  source

getNewTweets通过受限制的 Web API 请求其数据的位置。我已经成功地通过threadDelay在生产者中添加 a 来“调整”正在执行的请求数量。但是,我计划向我的管道添加额外的源,并开始以循环方式使用来自每个生产者的数据。因此,在生产者中放置 a threadDelay不再有意义。我想把延迟放在上游的某个地方。

我已经尝试过向消费者添加延迟或在管道中间添加一个iterMC带有 athreadDelay的东西,但没有做任何事情。我猜多线程正在发生,所以延迟不能正常工作?

或者,一个新的想法!大多数情况下,我的生产者根本不产生任何值并且执行yieldMany []...所以在消费者之外的延迟可能无法正常工作,除非我将[]其视为正常的流值并将生产者重写为ConduitT () [Value] IO (). 关于这是否适用于CE消费者的分块()变体或空块是否会以某种方式短路的任何想法?我想我会使用一个非分块转换器来添加延迟,以便它会随着每个请求而被触发,并将在管道末端使用消费者的 CE 变体之一。当我有机会时,我会进行实验。

如果有其他技术可以在 Haskell 中对这种行为进行编码,我将感谢任何建议,因为我对 Haskell 尤其是管道库相对较新;我什至不确定我如何/是否可以首先循环浏览多个来源。

标签: haskellconduit

解决方案


更改流以返回数据块(包括空块)允许在上游添加延迟。我有一个使用 ZipSource 的粗略原型,但很可能需要更改我的方法以使源彼此独立地提取数据。

followUser :: Text -> ConduitT () [Tweet] Twitter ()
followUser sn = do
  ts <- lift $ getNewTweets [("screen_name", sn)]
  yield ts
  source

handleTweet :: Tweet -> IO ()
handleTweet = print

main :: IO ()
main = do
  let sources = sequenceSources [ followUser "user1"
                                , followUser "user2"
                                ]

  _ <- runTwitter $ runConduit
    $ sources
    .| iterMC (\a -> threadDelay 3000000)
    .| mapM_CE (liftIO . Prelude.mapM_ handleTweet)

  return ()


推荐阅读