首页 > 解决方案 > 通过 Spark RDD 或 SQL API 中的每个空行将多行收集到一个数组中

问题描述

我有一个格式的文本文件CoNLL-U,我需要提取Token_Label. 文件示例:

# newdoc id = weblog-blogspot.com_nominations_20041117172713_ENG_20041117_172713
# sent_id = weblog-blogspot.com_nominations_20041117172713_ENG_20041117_172713-0001
# text = From the AP comes this story :
1   From    from    ADP IN  _   3   case    3:case  _
2   the the DET DT  Definite=Def|PronType=Art   3   det 3:det   _
3   AP  AP  PROPN   NNP Number=Sing 4   obl 4:obl:from  _
4   comes   come    VERB    VBZ Mood=Ind|Number=Sing|Person=3|Tense=Pres|VerbForm=Fin   0   root    0:root  _
5   this    this    DET DT  Number=Sing|PronType=Dem    6   det 6:det   _
6   story   story   NOUN    NN  Number=Sing 4   nsubj   4:nsubj _
7   :   :   PUNCT   :   _   4   punct   4:punct _

# sent_id = weblog-juancole.com_juancole_20040324065800_ENG_20040324_065800-0005
# text = In Ramadi, there was a big demonstration.
1   In  in  ADP IN  _   2   case    2:case  _
2   Ramadi  Ramadi  PROPN   NNP Number=Sing 5   obl 5:obl:in    SpaceAfter=No
3   ,   ,   PUNCT   ,   _   5   punct   5:punct _
4   there   there   PRON    EX  _   5   expl    5:expl  _
5   was be  VERB    VBD Mood=Ind|Number=Sing|Person=3|Tense=Past|VerbForm=Fin   0   root    0:root  _
6   a   a   DET DT  Definite=Ind|PronType=Art   8   det 8:det   _
7   big big ADJ JJ  Degree=Pos  8   amod    8:amod  _
8   demonstration   demonstration   NOUN    NN  Number=Sing 5   nsubj   5:nsubj SpaceAfter=No
9   .   .   PUNCT   .   _   5   punct   5:punct _

如您所见,每个句子都被标记化并在每个标记前添加一些标签,由制表符分隔\t(标记、引理、UD POS 等),每个句子用空行分隔。

为了获取Token_POS每个句子,我使用此代码为每个句子生成一个文本What_PRON if_SCON...,然后我将其转换为 Dataframe,以便我可以使用withColumn将标记和标签作为我项目的数组类型提取到单独的列中。

val testPath = "en_ewt-ud-test.conllu"
val testInput = spark.read.text(testPath).as[String]

val extractedTokensTags = testInput.map(s => s.split("\t")
.filter(x => !x.startsWith("#"))).filter(x => x.length > 0)
.map{x => if(x.length > 1){x(1) + "_" + x(3)} else{"endOfLine"}}
.map(x => x.mkString)
.reduce((s1, s2) => s1 + " " + s2).split(" endOfLine | endOfLine")

spark.sparkContext.parallelize(extractedTokensTags).toDF("arrays").show

|              arrays|
+--------------------+
|What_PRON if_SCON...|
|What_PRON if_SCON...|
|[_PUNCT via_ADP M...|
|(_PUNCT And_CCONJ...|
|This_DET BuzzMach...|
|Google_PROPN is_A...|
|Does_AUX anybody_...|
|They_PRON own_VER...|

这段代码绝对是黑客!它甚至看起来很丑,但它完成了工作并给了我直到现在我想要的东西!

问题:

如果文件很大,reduce部分将创建多个任务,这导致不保留行的顺序。(我想我可能会弄乱洗牌或任务的数量,但是做一个 hack 就足够了!)

问题:

  1. 如何根据该空行对行进行分组?(我想摆脱and中的那个endOfLinehack ).map.reduce

  2. 是否可以为每个部分的每一行使用带有唯一索引的 zipWithIndex,所以最后我可以使用 reduceByKey 或在我的 Dataframe 中使用相同的 ID 而无需关心订单?

  3. 仅通过 Spark SQL API 是否有更好的方法来做到这一点?

给定示例的期望结果:

  1. Array[String] 所以我可以将它并行化为 DataFrame

Array[String] = Array(From_ADP the_DET AP_PROPN come_VERB this_DET story_NOUN :_PUNCT)

Array[String] = Array(In_ADP Ramadi_PROPN ,_PUNCT there_PRON was_VERB a_DET big_ADJ demo_NOUN ._PUNCT)

或者

  1. 具有 2 列的数据框:

代币:Array[String] = (From, the, AP, come, this, story, :)

标签: Array[String] = (ADP, DET, PROPN, VERB, DET, NOUN, PUNCT)

如果我能得到这两个结果中的任何一个,我就可以处理剩下的事情。我的主要问题是不知道如何使用空行作为分隔符或某种分隔符来对行进行分组,第二个问题是按 ID 或逐行保留顺序。

非常感谢。

更新:在 Scala 中解析多行记录

我确实看到并尝试了有关解析具有\\n分隔符的多行文本文件的其他问题。我已经在替换\\n我的数据集中没有的东西,所以我更喜欢 1. 留在 Spark 中(你可以看到它是可能的) 2. 找到一种方法来reduce不重新排序或添加每行的唯一 ID,因此我可以保留订单

标签: apache-sparkapache-spark-sqlrdd

解决方案


推荐阅读