java - Can Apache Spark use TCP listener as input?
问题描述
Can Apache Spark use TCP listener as input? If yes, maybe someone has examples of java code, that do the operation.
I try to find examples about this, but all tutorials show how to define input connection to the data server via TCP and not using a TCP listener that waits for incoming data.
解决方案
是的,可以使用 Spark 侦听 TCP 端口并处理任何传入数据。您正在寻找的是Spark Streaming。
在文档和github上有一个小指南可以收听 TCP 源。为了方便:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
推荐阅读
- javascript - 在自定义组件中,在样式标签中加载样式或使用链接标签更快吗?
- php - 用 2 个带条件的表对 concat 进行分组
- php - 如何更新购物车 cookie 中存储的产品数量?
- c++ - 我是否应该检查并释放 VLA 类的分配运算符 (operator=) 中的指针
- nuxt.js - 如何在现有 Nuxt.js 项目中从 eslint-config-prettier 切换到 prettier-eslint?
- sql - SQL - 替换 SQL 字符串中的字符
- python - 如果 pandas python 中所有行的条件都不起作用
- python - 如何在python中解码JSON文件并获取特定的密钥
- c# - 如何使用 C# 在 Windows 应用程序中使用 SaveFileDialog 在来自 HttpClient 的本地计算机上保存 Zip 文件?
- ms-office - Excel Open XML - 黑底白字 - 颜色在哪里定义?