首页 > 解决方案 > Haskell / Conduit:逐行读取文件

问题描述

场景:我有一个 ~900mb 的文本文件,其格式如下

...
Id:   109101
ASIN: 0806978473
  title: The Beginner's Guide to Tai Chi
  group: Book
  salesrank: 672264
  similar: 0
  categories: 3
   |Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|General[16575]
   |Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|Taichi[16583]
   |Books[283155]|Subjects[1000]|Sports[26]|General[11086921]
  reviews: total: 2  downloaded: 2  avg rating: 5
    2000-4-4  cutomer: A191SV1V1MK490  rating: 5  votes:   0  helpful:   0
    2004-7-10  cutomer:  AVXBUEPNVLZVC  rating: 5  votes:   0  helpful:   0
                    (----- empty line ------)    
Id :

并想从中解析信息。

问题:作为第一步(因为我需要它用于另一个上下文)我想逐行处理文件,然后将属于一个产品的“块”收集在一起,然后用其他逻辑单独处理它们。

所以计划如下:

  1. 定义代表文本文件的源
  2. 定义一个管道(?),该管道从该源中各取一条线,并且...
  3. ...将其传递给其他一些组件。

现在,我正在尝试调整以下示例:

doStuff = do
  writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()

  runConduitRes                  -- m r
    $ sourceFileBS "input.txt"   -- ConduitT i ByteString m ()  -- by "chunk"
    .| sinkFile "output.txt"     -- FilePath -> ConduitT ByteString o m ()

  readFile "output.txt"
    >>= putStrLn 

所以sourceFileBS "input.txt"是 type ConduitT i ByteString m (),即一个带有

sinkFile将所有传入数据流式传输到给定文件中。sinkFile "output.txt"是具有输入类型的管道ByteStream

我现在想要的是逐行处理输入源,即每个下游只传递一行。在伪代码中:

sourceFile "input.txt"
splitIntoLines
yieldMany (?)
other stuff

我怎么做?

我目前拥有的是

copyFile = do
  writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()

  runConduitRes                  -- m r
    (lineC $ sourceFileBS "input.txt")   -- ConduitT i ByteString m ()  -- by "chunk"
    .| sinkFile "output.txt"     -- FilePath -> ConduitT ByteString o m ()

  readFile "output.txt"
    >>= putStrLn --

但这会产生以下类型错误:

    * Couldn't match type `bytestring-0.10.8.2:Data.ByteString.Internal.ByteString'
                     with `Void'
      Expected type: ConduitT
                       ()
                       Void
                       (ResourceT
                          (ConduitT
                             a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
                       ()
        Actual type: ConduitT
                       ()
                       bytestring-0.10.8.2:Data.ByteString.Internal.ByteString
                       (ResourceT
                          (ConduitT
                             a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
                       ()
    * In the first argument of `runConduitRes', namely
        `(lineC $ sourceFileBS "input.txt")'
      In the first argument of `(.|)', namely
        `runConduitRes (lineC $ sourceFileBS "input.txt")'
      In a stmt of a 'do' block:
        runConduitRes (lineC $ sourceFileBS "input.txt")
          .| sinkFile "output.txt"
   |
28 |     (lineC $ sourceFileBS "input.txt")   -- ConduitT i ByteString m ()  -- by "chunk"
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

这让我相信现在的问题是第一个管道没有兼容的输入类型runConduitRes

我只是无法理解它,真的需要一个提示。

提前非常感谢。

标签: haskellconduit

解决方案


我今天为此苦苦挣扎,并在试图找出类似问题时发现了这个问题。我试图将 git 日志分成块以进行进一步解析,例如

commit 12345
Author: Me
Date:   Thu Jan 25 13:45:16 2019 -0500

    made some changes

 1 file changed, 10 insertions(+), 0 deletions(-)

commit 54321
Author: Me
...and so on...

我需要的函数几乎splitOnUnBounded来自Data.Conduit.Combinators,但我不太清楚如何在那里编写谓词函数。

我想出了以下Conduit是对splitOnUnbounded. source它将采用列表流。每个列表只有一行文本,因为我发现这样考虑更容易一些,尽管这肯定不是最佳解决方案。

它将使用一个函数将文本行组合在一起,该函数采用下一行并返回一个Bool指示下一行是否是下一组文本的开始。


groupLines :: (Monad m, MonadIO m) => (Text -> Bool) -> [T.Text] -> ConduitM Text [Text] m ()
groupLines startNextLine ls = start
  where
    -- If the next line in the stream is Nothing, return.
    -- If the next line is the stream is Just line, then
    --   accumulate that line
    start = await >>= maybe (return ()) (accumulateLines ls)
    accumulateLines ls nextLine = do
      -- if ls is [], then add nextLine. Try to get a new next line. If there isn't one, yield. If there is a next line,
      --     yield lines and call accumulatelines again.
      -- if ls is [Text], check if nextLine is the start of the next group. If it isn't, add nextLine to ls,
      --    try got the the next nextLine. if there isn't one, yield, and if there is one, call accumulate lines again.
      --    If nextLine _is_ the start of the next group, the yield this group of lines and call accumulate lines again.
      nextLine' <- await
      case nextLine' of
        Nothing -> yield ls'
        Just l ->
          if Prelude.null ls
            then accumulateLines ls' l
            else
              if startNextLine l
                then yield ls' >> accumulateLines [] l
                else accumulateLines ls' l
      where
        ls' = ls ++ [nextLine]

它可以在如下管道中使用。只需在函数上方传递函数,该Text -> Bool函数告诉管道何时开始下一个文本集合。


isCommitLine :: Text -> Bool
isCommitLine t = listToMaybe (TS.indices "commit" t) == Just 0

logParser =
  sourceFile "logs.txt"
    .| decodeUtf8
    .| linesUnbounded
    .| groupLines isCommitLine []
    .| Data.Conduit.Combinators.map (intercalate "\n")
    -- do something with each log entry here --
    .| Data.Conduit.Combinators.print

main :: IO ()
main = runConduitRes logParser

我是 Haskell 的新手,我强烈怀疑这不是实现这一目标的最佳方式。所以如果其他人有更好的建议,我会很乐意学习!否则,也许在这里发布这个解决方案会帮助一些人。


推荐阅读