首页 > 解决方案 > 如何过滤类型为(String,Seq(String,String,...))的元组中的'@'字符?

问题描述

我正在尝试在推文中找到所有提及的人。为此,我需要过滤@推文中以 ' ' 开头的单词。

我正在使用带有 Spark 2.4.3 的 Java 8。

首先,我将我的 JSON 文件加载为一个RDD[Tweet](该练习专门要求使用 RDD,而不是 DataFrame)。

Tweet 是一个案例类,如下所示:

case class Tweet (
    id : String,
    user : String,
    userName : String,
    text : String,
    place : String,
    country : String,
    lang : String
)

接下来我创建对(用户,推文),并在推文中用“”分割单词:

val tweets = loadData
val persons = tweets.map(row => (row.user, Seq(row.text.split(" ").mkString(","))))

然后,我尝试了flatMap一切并遍历所有元素以找到“@”,但我没有成功。

我希望结果如下:

(user1, Seq(@michael, @jean, @paul, @charles))
(user2, Seq(@kol, @louis))

标签: scalaapache-sparkintellij-idea

解决方案


请检查:

//Input tweet data  
   scala> df.show(false)
    +---+--------+-------------+-----------------------------------+-------+-------+--------+
    |Id |User    |UserName     |Text                               |Place  |Country|language|
    +---+--------+-------------+-----------------------------------+-------+-------+--------+
    |1  |nksuthar|Nikhil Suthar|Good Morning @nikhil and @hardik   |Mumbai |India  |Hindi   |
    |2  |matt    |Mathew       |Welcome @John                      |Paris  |France |English |
    |3  |JohnM   |Johnson      |@Mathew Are you coming with @nguyen|NewYork|US     |English |
    +---+--------+-------------+-----------------------------------+-------+-------+--------+

    //Create one UDF to check '@' in Text

    scala> val check_@ = (s:String) => {
                val ss = s.split(" ").filter(x => x.contains("@"))
                ss.toSeq}

    //Register UDF as filtreUDF

    scala> val filterUdf = udf(check_@)

    scala> val newdf  = df.withColumn("new_col", filterUdf(col("Text")))

    /*
    +---+--------+-------------+-----------------------------------+-------+-------+--------+------------------+
    |Id |User    |UserName     |Text                               |Place  |Country|language|new_col           |
    +---+--------+-------------+-----------------------------------+-------+-------+--------+------------------+
    |1  |nksuthar|Nikhil Suthar|Good Morning @nikhil and @hardik   |Mumbai |India  |Hindi   |[@nikhil, @hardik]|
    |2  |matt    |Mathew       |Welcome @John                      |Paris  |France |English |[@John]           |
    |3  |JohnM   |Johnson      |@Mathew Are you coming with @nguyen|NewYork|US     |English |[@Mathew, @nguyen]|
    +---+--------+-------------+-----------------------------------+-------+-------+--------+------------------+
    */
    scala> val finaltpl = newdf.select("User","new_col").rdd.map(x => (x.get(0), x.get(1)))

    scala> finaltpl.foreach(println)
    (matt,WrappedArray(@John))
    (JohnM,WrappedArray(@Mathew, @nguyen))
    (nksuthar,WrappedArray(@nikhil, @hardik))

推荐阅读