haskell - Haskell / Conduit:逐行读取文件
问题描述
场景:我有一个 ~900mb 的文本文件,其格式如下
...
Id: 109101
ASIN: 0806978473
title: The Beginner's Guide to Tai Chi
group: Book
salesrank: 672264
similar: 0
categories: 3
|Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|General[16575]
|Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|Taichi[16583]
|Books[283155]|Subjects[1000]|Sports[26]|General[11086921]
reviews: total: 2 downloaded: 2 avg rating: 5
2000-4-4 cutomer: A191SV1V1MK490 rating: 5 votes: 0 helpful: 0
2004-7-10 cutomer: AVXBUEPNVLZVC rating: 5 votes: 0 helpful: 0
(----- empty line ------)
Id :
并想从中解析信息。
问题:作为第一步(因为我需要它用于另一个上下文)我想逐行处理文件,然后将属于一个产品的“块”收集在一起,然后用其他逻辑单独处理它们。
所以计划如下:
- 定义代表文本文件的源
- 定义一个管道(?),该管道从该源中各取一条线,并且...
- ...将其传递给其他一些组件。
现在,我正在尝试调整以下示例:
doStuff = do
writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()
runConduitRes -- m r
$ sourceFileBS "input.txt" -- ConduitT i ByteString m () -- by "chunk"
.| sinkFile "output.txt" -- FilePath -> ConduitT ByteString o m ()
readFile "output.txt"
>>= putStrLn
所以sourceFileBS "input.txt"
是 type ConduitT i ByteString m ()
,即一个带有
- 输入类型
i
- 输出类型
ByteStream
- 单子类型
t
- 结果类型
()
。
sinkFile
将所有传入数据流式传输到给定文件中。sinkFile "output.txt"
是具有输入类型的管道ByteStream
。
我现在想要的是逐行处理输入源,即每个下游只传递一行。在伪代码中:
sourceFile "input.txt"
splitIntoLines
yieldMany (?)
other stuff
我怎么做?
我目前拥有的是
copyFile = do
writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()
runConduitRes -- m r
(lineC $ sourceFileBS "input.txt") -- ConduitT i ByteString m () -- by "chunk"
.| sinkFile "output.txt" -- FilePath -> ConduitT ByteString o m ()
readFile "output.txt"
>>= putStrLn --
但这会产生以下类型错误:
* Couldn't match type `bytestring-0.10.8.2:Data.ByteString.Internal.ByteString'
with `Void'
Expected type: ConduitT
()
Void
(ResourceT
(ConduitT
a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
()
Actual type: ConduitT
()
bytestring-0.10.8.2:Data.ByteString.Internal.ByteString
(ResourceT
(ConduitT
a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
()
* In the first argument of `runConduitRes', namely
`(lineC $ sourceFileBS "input.txt")'
In the first argument of `(.|)', namely
`runConduitRes (lineC $ sourceFileBS "input.txt")'
In a stmt of a 'do' block:
runConduitRes (lineC $ sourceFileBS "input.txt")
.| sinkFile "output.txt"
|
28 | (lineC $ sourceFileBS "input.txt") -- ConduitT i ByteString m () -- by "chunk"
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
这让我相信现在的问题是第一个管道没有兼容的输入类型runConduitRes
。
我只是无法理解它,真的需要一个提示。
提前非常感谢。
解决方案
我今天为此苦苦挣扎,并在试图找出类似问题时发现了这个问题。我试图将 git 日志分成块以进行进一步解析,例如
commit 12345
Author: Me
Date: Thu Jan 25 13:45:16 2019 -0500
made some changes
1 file changed, 10 insertions(+), 0 deletions(-)
commit 54321
Author: Me
...and so on...
我需要的函数几乎splitOnUnBounded
来自Data.Conduit.Combinators,但我不太清楚如何在那里编写谓词函数。
我想出了以下Conduit
是对splitOnUnbounded
. source它将采用列表流。每个列表只有一行文本,因为我发现这样考虑更容易一些,尽管这肯定不是最佳解决方案。
它将使用一个函数将文本行组合在一起,该函数采用下一行并返回一个Bool
指示下一行是否是下一组文本的开始。
groupLines :: (Monad m, MonadIO m) => (Text -> Bool) -> [T.Text] -> ConduitM Text [Text] m ()
groupLines startNextLine ls = start
where
-- If the next line in the stream is Nothing, return.
-- If the next line is the stream is Just line, then
-- accumulate that line
start = await >>= maybe (return ()) (accumulateLines ls)
accumulateLines ls nextLine = do
-- if ls is [], then add nextLine. Try to get a new next line. If there isn't one, yield. If there is a next line,
-- yield lines and call accumulatelines again.
-- if ls is [Text], check if nextLine is the start of the next group. If it isn't, add nextLine to ls,
-- try got the the next nextLine. if there isn't one, yield, and if there is one, call accumulate lines again.
-- If nextLine _is_ the start of the next group, the yield this group of lines and call accumulate lines again.
nextLine' <- await
case nextLine' of
Nothing -> yield ls'
Just l ->
if Prelude.null ls
then accumulateLines ls' l
else
if startNextLine l
then yield ls' >> accumulateLines [] l
else accumulateLines ls' l
where
ls' = ls ++ [nextLine]
它可以在如下管道中使用。只需在函数上方传递函数,该Text -> Bool
函数告诉管道何时开始下一个文本集合。
isCommitLine :: Text -> Bool
isCommitLine t = listToMaybe (TS.indices "commit" t) == Just 0
logParser =
sourceFile "logs.txt"
.| decodeUtf8
.| linesUnbounded
.| groupLines isCommitLine []
.| Data.Conduit.Combinators.map (intercalate "\n")
-- do something with each log entry here --
.| Data.Conduit.Combinators.print
main :: IO ()
main = runConduitRes logParser
我是 Haskell 的新手,我强烈怀疑这不是实现这一目标的最佳方式。所以如果其他人有更好的建议,我会很乐意学习!否则,也许在这里发布这个解决方案会帮助一些人。
推荐阅读
- node.js - 重放 RPC 调用以进行测试
- linux - 如何在 Linux 上发送请求 POST
- java - 如何在java中从xml节点创建格式化字符串
- r - 将多个 .rda 文件加载到 r 中的列表中
- android - 更新时没有错误,但是图像无法更新android sqlite
- java - 更改 NetBeans 外观的外观
- r - lapply,数据争吵日期,意外输出
- jenkins - 访问先前 Jenkins 构建中哪个阶段失败
- ruby-on-rails - Rails index.html.erb 删除项目 - 不工作
- regex - Oracle SQL:提取每行中的所有匹配字符串