haskell - 导管:怎么样?具有背压的多个生产者到一个消费者
问题描述
无论如何我可以表达“背压”来限制来自管道源的值的产生吗?
假设我有类似的东西:
source :: ConduitT () Tweet Twitter ()
source = do
ts <- lift $ getNewTweets [("screen_name", "Bits90824664")]
yieldMany ts
_ <- liftIO $ threadDelay 3000000
source
getNewTweets
通过受限制的 Web API 请求其数据的位置。我已经成功地通过threadDelay
在生产者中添加 a 来“调整”正在执行的请求数量。但是,我计划向我的管道添加额外的源,并开始以循环方式使用来自每个生产者的数据。因此,在生产者中放置 a threadDelay
不再有意义。我想把延迟放在上游的某个地方。
我已经尝试过向消费者添加延迟或在管道中间添加一个iterMC
带有 athreadDelay
的东西,但没有做任何事情。我猜多线程正在发生,所以延迟不能正常工作?
或者,一个新的想法!大多数情况下,我的生产者根本不产生任何值并且执行yieldMany []
...所以在消费者之外的延迟可能无法正常工作,除非我将[]
其视为正常的流值并将生产者重写为ConduitT () [Value] IO ()
. 关于这是否适用于我想我会使用一个非分块转换器来添加延迟,以便它会随着每个请求而被触发,并将在管道末端使用消费者的 CE 变体之一。当我有机会时,我会进行实验。CE
消费者的分块()变体或空块是否会以某种方式短路的任何想法?
如果有其他技术可以在 Haskell 中对这种行为进行编码,我将感谢任何建议,因为我对 Haskell 尤其是管道库相对较新;我什至不确定我如何/是否可以首先循环浏览多个来源。
解决方案
更改流以返回数据块(包括空块)允许在上游添加延迟。我有一个使用 ZipSource 的粗略原型,但很可能需要更改我的方法以使源彼此独立地提取数据。
followUser :: Text -> ConduitT () [Tweet] Twitter ()
followUser sn = do
ts <- lift $ getNewTweets [("screen_name", sn)]
yield ts
source
handleTweet :: Tweet -> IO ()
handleTweet = print
main :: IO ()
main = do
let sources = sequenceSources [ followUser "user1"
, followUser "user2"
]
_ <- runTwitter $ runConduit
$ sources
.| iterMC (\a -> threadDelay 3000000)
.| mapM_CE (liftIO . Prelude.mapM_ handleTweet)
return ()
推荐阅读
- c# - EF Core 2.2 空间类型无法添加到数据库迁移
- c# - C# facebook sdk : (OAuthException - #100) (#100) GK 中的应用只需要通过 TOS 检查
- angular - MatDialog 不显示通过订阅更新的值
- php - Laravel MethodNotAllowedHttpException (405) 文件上传
- html - 如何使用 Python 从 GitHub 中提取评论正文
- detox - 在 ScrollView 组件上使用 whileElement().scroll() 进行无休止的非滚动
- pdf - 使用 pyspark 将 pdf 保存到 HDFS
- graphics - 在 Latex 中为父文件夹配置 /graphicspath
- spring - Spring Cloud Stream:同一个应用程序中的两个不同的Kafka
- java - Android - 如何在谷歌地图上显示 imageView?