首页 > 解决方案 > 使用 Scala Iterator 使用 RegEx 匹配将大流(从字符串)分解成块,然后对这些块进行操作?

问题描述

我目前正在使用一种不太类似于 Scala 的方法来解析大型 Unix 邮箱文件。我仍在学习这门语言,并想挑战自己以找到更好的方法,但是,我不相信我对使用 an 可以做什么Iterator以及如何有效地使用它有一个坚实的掌握。

我目前正在使用 org.apache.james.mime4j,并且我使用来从文件org.apache.james.mime4j.mboxiterator.MboxIterator中获取 a ,如下所示:java.util.Iterator

 // registers an implementation of a ContentHandler that
 // allows me to construct an object representing an email
 // using callbacks
 val handler: ContentHandler = new MyHandler();

 // creates a parser that parses a SINGLE email from a given InputStream
 val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
 // register my handler
 parser.setContentHandler(handler);

 // Get a java.util.Iterator
 val iterator = MboxIterator.fromFile(fileName).build();
 // For each email, process it using above Handler
 iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))

根据我的理解,ScalaIterator更加健壮,并且可能更有能力处理这样的事情,特别是因为我并不总是能够将整个文件放入内存中。

我需要构建自己的MboxIterator. 我挖掘了源代码MboxIterator并能够找到一个很好的 RegEx 模式来确定单个电子邮件消息的开头,但是,我从现在开始就画了一个空白。

我像这样创建了正则表达式:

 val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);

我想做什么(基于我目前所知道的):

我觉得我应该能够使用map(),find()和来完成此任务filter()collect()但我被他们只给我Ints 工作的事实所抛弃。

我将如何做到这一点?

编辑:

在对这个主题做了更多思考之后,我想到了另一种方式来描述我认为我需要做的事情:

  1. 我需要继续从流中读取,直到我得到一个与我的 RegEx 匹配的字符串

  2. 也许group以前读取的字节?

  3. 将其发送到某处进行处理

  4. 以某种方式将其从范围中删除,这样下次我遇到匹配时它就不会被分组

  5. 继续阅读流,直到找到下一个匹配项。

  6. 利润???

编辑2:

我想我越来越近了。使用这样的方法让我得到一个迭代器的迭代器。但是,有两个问题: 1.这是浪费内存吗?这是否意味着所有内容都被读入内存?2. 我仍然需要想办法通过 进行拆分match仍将其包含在返回的迭代器中。

def split[T](iter: Iterator[T])(breakOn: T => Boolean): 
    Iterator[Iterator[T]] =
        new Iterator[Iterator[T]] {
           def hasNext = iter.hasNext

           def next = {
              val cur = iter.takeWhile(!breakOn(_))
              iter.dropWhile(breakOn)
              cur
            }
 }.withFilter(l => l.nonEmpty)  

标签: regexscalastreamiteratorchunking

解决方案


如果我理解正确,您想懒惰地分块由正则表达式可识别模式分隔的大文件。

您可以尝试Iterator为每个请求返回一个,但正确的迭代器管理并非易事。

我倾向于对客户端隐藏所有文件和迭代器管理。

class MBox(filePath :String) {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :Option[String] =
    if (itr.hasNext) {
      val sb = new StringBuilder()
      sb.append(itr.next() + "\n")
      while (itr.hasNext && !header.matches(itr.head))
        sb.append(itr.next() + "\n")
      Some(sb.mkString)
    } else {
      file.close()
      None
    }
}

测试:

val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul  8 12:08:34 2011
//some text AAA
//some text BBB
//)

mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun  8 12:18:34 2012
//small text
//)

mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan  8 11:18:14 2013
//some text CCC
//some text DDD
//)

mbox.next()  //res3: Option[String] = None

每个打开的文件只有一个Iterator,并且只调用安全方法。文件文本仅在请求时实现(加载),并且客户端获得所请求的内容(如果可用)。如果更适用,String您可以将每一行作为集合的一部分返回,而不是一条长的所有行。Seq[String]


更新:这可以修改以便于迭代。

class MBox(filePath :String) extends Iterator[String] {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :String = {
    val sb = new StringBuilder()
    sb.append(itr.next() + "\n")
    while (itr.hasNext && !header.matches(itr.head))
      sb.append(itr.next() + "\n")
    sb.mkString
  }

  def hasNext: Boolean =
    if (itr.hasNext) true else {file.close(); false}
}

现在你可以.foreach(), .map(), .flatMap(), 等等。但你也可以做一些危险的事情.toList,比如加载整个文件。


推荐阅读