regex - 使用 Scala Iterator 使用 RegEx 匹配将大流(从字符串)分解成块,然后对这些块进行操作?
问题描述
我目前正在使用一种不太类似于 Scala 的方法来解析大型 Unix 邮箱文件。我仍在学习这门语言,并想挑战自己以找到更好的方法,但是,我不相信我对使用 an 可以做什么Iterator
以及如何有效地使用它有一个坚实的掌握。
我目前正在使用 org.apache.james.mime4j
,并且我使用来从文件org.apache.james.mime4j.mboxiterator.MboxIterator
中获取 a ,如下所示:java.util.Iterator
// registers an implementation of a ContentHandler that
// allows me to construct an object representing an email
// using callbacks
val handler: ContentHandler = new MyHandler();
// creates a parser that parses a SINGLE email from a given InputStream
val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
// register my handler
parser.setContentHandler(handler);
// Get a java.util.Iterator
val iterator = MboxIterator.fromFile(fileName).build();
// For each email, process it using above Handler
iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))
根据我的理解,ScalaIterator
更加健壮,并且可能更有能力处理这样的事情,特别是因为我并不总是能够将整个文件放入内存中。
我需要构建自己的MboxIterator
. 我挖掘了源代码MboxIterator
并能够找到一个很好的 RegEx 模式来确定单个电子邮件消息的开头,但是,我从现在开始就画了一个空白。
我像这样创建了正则表达式:
val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);
我想做什么(基于我目前所知道的):
FileInputStream
从 MBOX 文件构建一个。- 用于
Iterator.continually(stream.read())
读取流 - 用于
.takeWhile()
继续读取直到流结束 - 使用类似的东西对 Stream 进行分块
MESSAGE_START.matcher(someString).find()
,或者使用它来查找单独的消息的索引 - 读取创建的块,或读取创建的索引之间的位
我觉得我应该能够使用map()
,find()
和来完成此任务filter()
,collect()
但我被他们只给我Int
s 工作的事实所抛弃。
我将如何做到这一点?
编辑:
在对这个主题做了更多思考之后,我想到了另一种方式来描述我认为我需要做的事情:
我需要继续从流中读取,直到我得到一个与我的 RegEx 匹配的字符串
也许
group
以前读取的字节?将其发送到某处进行处理
以某种方式将其从范围中删除,这样下次我遇到匹配时它就不会被分组
继续阅读流,直到找到下一个匹配项。
利润???
编辑2:
我想我越来越近了。使用这样的方法让我得到一个迭代器的迭代器。但是,有两个问题: 1.这是浪费内存吗?这是否意味着所有内容都被读入内存?2. 我仍然需要想办法通过 进行拆分,但match
仍将其包含在返回的迭代器中。
def split[T](iter: Iterator[T])(breakOn: T => Boolean):
Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
def hasNext = iter.hasNext
def next = {
val cur = iter.takeWhile(!breakOn(_))
iter.dropWhile(breakOn)
cur
}
}.withFilter(l => l.nonEmpty)
解决方案
如果我理解正确,您想懒惰地分块由正则表达式可识别模式分隔的大文件。
您可以尝试Iterator
为每个请求返回一个,但正确的迭代器管理并非易事。
我倾向于对客户端隐藏所有文件和迭代器管理。
class MBox(filePath :String) {
private val file = io.Source.fromFile(filePath)
private val itr = file.getLines().buffered
private val header = "From .+ \\d{4}".r //adjust to taste
def next() :Option[String] =
if (itr.hasNext) {
val sb = new StringBuilder()
sb.append(itr.next() + "\n")
while (itr.hasNext && !header.matches(itr.head))
sb.append(itr.next() + "\n")
Some(sb.mkString)
} else {
file.close()
None
}
}
测试:
val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul 8 12:08:34 2011
//some text AAA
//some text BBB
//)
mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun 8 12:18:34 2012
//small text
//)
mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan 8 11:18:14 2013
//some text CCC
//some text DDD
//)
mbox.next() //res3: Option[String] = None
每个打开的文件只有一个Iterator
,并且只调用安全方法。文件文本仅在请求时实现(加载),并且客户端获得所请求的内容(如果可用)。如果更适用,String
您可以将每一行作为集合的一部分返回,而不是一条长的所有行。Seq[String]
更新:这可以修改以便于迭代。
class MBox(filePath :String) extends Iterator[String] {
private val file = io.Source.fromFile(filePath)
private val itr = file.getLines().buffered
private val header = "From .+ \\d{4}".r //adjust to taste
def next() :String = {
val sb = new StringBuilder()
sb.append(itr.next() + "\n")
while (itr.hasNext && !header.matches(itr.head))
sb.append(itr.next() + "\n")
sb.mkString
}
def hasNext: Boolean =
if (itr.hasNext) true else {file.close(); false}
}
现在你可以.foreach()
, .map()
, .flatMap()
, 等等。但你也可以做一些危险的事情.toList
,比如加载整个文件。
推荐阅读
- javascript - Node、Express 静态文件和动态路由
- java - 如何将我的注册提供者与动态创建的球衣资源类一起使用?
- r - 从R中的字符串获取非零值
- amazon-web-services - Aws configure 未将配置数据保存在 Windows 10 上的凭据和配置文件中
- node.js - 如何在 Typescript/Webpack 的 Bitbucket 管道中增加 NodeJS 堆(--max-old-space-size)?
- python - 如何在python中将备用图像从一个文件夹复制到另一个文件夹
- apache-kafka - Axon4 - kafka ext:未调用查询事件
- c# - 将过滤器应用于嵌入文档,过滤掉不同的值
- mysql - PHPMyAdmin 数据库连接错误
- android - 超级强大 - 导出到带有混合器和解码器问题的文件(不同的 sampleRates 和 samplesPerFrame)