java - 使用 Spark 的 MapReduce 调用不同的函数并聚合
问题描述
我对 spark 非常不熟悉,但我很确定有一种好方法可以比我现在做的更快地完成我想做的事情。
本质上,我有一个 S3 存储桶,其中包含大量 JSON 的 Twitter 数据。我想浏览所有这些文件,从 JSON 中获取文本,对文本进行情感分析(目前使用斯坦福 NLP),然后将 Tweet + Sentiment 上传到数据库(现在我正在使用 dynamo,但是这个不是成败)
我目前拥有的代码是
/**
* Per thread:
* 1. Download a file
* 2. Do sentiment on the file -> output Map<String, List<Float>>
* 3. Upload to Dynamo: (a) sentiment (b) number of tweets (c) timestamp
*
*/
List<String> keys = s3Connection.getKeys();
ThreadPoolExecutor threads = new ThreadPoolExecutor(40, 40, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threads.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (String key : keys) {
threads.submit(new Thread(() -> {
try {
S3Object s3Object = s3Connection.getObject(key);
Map<String, List<Float>> listOfTweetsWithSentiment = tweetSentimentService.getTweetsFromJsonFile(s3Object.getObjectContent());
List<AggregatedTweets> aggregatedTweets = tweetSentimentService.createAggregatedTweetsFromMap(listOfTweetsWithSentiment, key);
for (AggregatedTweets aggregatedTweet : aggregatedTweets) {
System.out.println(aggregatedTweet);
tweetDao.putItem(aggregatedTweet);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}));
}
这有效并且很好。通过在某些日期范围内运行此代码(即 getKeys 仅获取某些日期范围的密钥)并在不同的 EC2 上运行此过程的许多实例,我能够将这个过程加速到仅大约 2 小时,每个实例都作用于不同的日期范围。
但是,必须有一种更快的方法来使用一个好的 ole map-reduce 来做到这一点,但我什至不知道如何开始研究这个。是否可以在我的地图中进行情绪分析,然后根据时间戳减少?
此外,我正在考虑使用 AWS Glue,但我没有看到在那里使用斯坦福 NLP 库的好方法。
任何和所有的帮助将不胜感激。
解决方案
是的,您可以使用 Apache Spark 来实现。有很多方法可以设计你的应用程序、配置基础设施等。我提出一个简单的设计:
您在 AWS 上,因此使用 Spark 创建一个 EMR 集群。包含 Zeppelin 以进行交互式调试会很有用。
Spark 使用了多种数据抽象。您的朋友是 RDD 和数据集(阅读有关它们的文档)。将数据读取到数据集的代码可能相同:
SparkSession ss = SparkSession.builder().getOrCreate(); Dataset<Row> dataset = ss.read("s3a://your_bucket/your_path");
现在你有一个
Dataset<Row>
. 这对于类似 SQL 的操作很有用。为了您的分析,您需要将其转换为 Spark RDD:JavaRDD<Tweet> analyticRdd = dataset.toJavaRDD().map(row -> { return TweetsFactory.tweetFromRow(row); });
所以,
analyticRdd
你可以做你的分析人员。只是不要忘记让所有使用数据的服务可序列化。
推荐阅读
- excel - 如何使用 VBA 变量在 VLookup 中跨多个工作簿的查找值
- r - 如何使用 ggplot barplot 绘制表函数的结果(x 轴值问题)
- redundancy - 您能否在现有的 Apache Geode 区域上增加复制?
- sql-server - 在“)”附近预期条件的上下文中指定的非布尔类型的表达式
- r - 如何使用 r 中的 lavaan 包获取 CFA 中因子的相关矩阵和 p 值?
- javascript - res.redirect 错误 [ERR_HTTP_HEADERS_SENT]:在将标头发送到客户端后无法设置标头
- python-3.x - 在linux系统上构建python包后如何访问资源文件
- sas - 网格单元的 SAS 点计数
- vue.js - 如果我的变量是数组,如何更改 vuex 中的状态
- android - Firebase 身份验证返回按钮