首页 > 解决方案 > 如何使用 apache spark 流和 Java API 从所有人中获取英文推文?

问题描述

您好,我是 Spark 的新手)我想做一些 Spark 项目,该项目将使用 spark-streaming 模块收集和处理来自这个社交网络的推文(用于我的小大学研究)。但是我有一个小问题,我现在不知道如何只用英语获取推文。谁能帮我解决这个问题?我尝试用已经收到的数据进行过滤操作,但我在这一行有 java.lang.NullPointerException:“ if (status.getPlace().getCountryCode().equals("(us)"))"。但这也是不好的解决方案。是否可以在接收之前过滤数据?请帮助我现在真的不知道这个。我很乐意得到你的提示。

package TwitterAnalysis;

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.twitter.*;
import twitter4j.GeoLocation;
import twitter4j.Status;



public class Twitter {

    private static void setTwitterOAuth() {
        System.setProperty("twitter4j.oauth.consumerKey", TwitterOAuthKey.consumerKey);
        System.setProperty("twitter4j.oauth.consumerSecret", TwitterOAuthKey.consumerSecret);
        System.setProperty("twitter4j.oauth.accessToken", TwitterOAuthKey.accessToken);
        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterOAuthKey.accessTokenSecret);
    }

    public static void main(String [] args) {


        setTwitterOAuth();


        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkTwitter");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));


        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);

        //filtering already received tweets
        JavaDStream<Status> englishTweets=twitterStream.filter(
                new Function <Status, Boolean>(){
                    public Boolean call (Status status){
                        if (status.getPlace().getCountryCode().equals("(us)")){
                            return true;
                        }else
                        {return false;}
                    }
                }
        );


         //Without filter: Output text of all tweets
        JavaDStream<String> statuses = englishTweets.map(
                new Function<Status, String>() {
                    public String call(Status status) { return status.getText(); }
                }
        );




        statuses.print();
        jssc.start();

    }
}

标签: javaapache-sparktwitter

解决方案


这是我刚刚创建新的 JavaDStream 并为他使用 getLang() 的答案。解决方案如下所示:

JavaDStream<Status> enTweetdDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));

推荐阅读