scala - 用于聚合 cassandra 数据的 Spark 作业
问题描述
我是新来的火花。我在 cassandra 中有下表:
CREATE TABLE cust_actions (
orgid text,
empid int,
custid int,
date timestamp,
action text
PRIMARY KEY (orgid, empid, custid, date)
) WITH CLUSTERING ORDER BY (empid ASC, custid ASC, date DESC)
此表包含员工对客户采取的每项操作的数据。该表每天获得超过 1000 万次插入。我有一个 3 节点的 cassandra 集群,在 18 台核心机器上运行,每台 32g 内存。
我想每天汇总数据,即在特定的一天对客户采取了多少行动。为此,我创建了另一个表:
CREATE TABLE daily_cust_actions (
custid int,
date date,
action text,
count int,
PRIMARY KEY (custid, date, action)
) WITH CLUSTERING ORDER BY (date ASC, action ASC)
为了做到这一点,我想使用 spark(如果这是错误的,请提出建议,或者还有其他一些替代方案)。我在其中一台 cassandra 机器上运行 spark(上面提到过),主服务器和从服务器有 9 个执行器,每个执行器都有 1g ram 和 2 个内核。
桌子大小大约 70 克。我无法汇总这些数据。不过,这适用于较小的数据集。这是我的火花脚本:
object DailyAggregation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "host1,host2,host3")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.input.split.size_in_mb", "10") //have tried multiple options here
val sc = new SparkContext("spark://host", "spark-cassandra", conf)
val rdd = sc.cassandraTable("mykeyspace","cust_actions")
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val df = new SimpleDateFormat("yyyy-MM-dd")
val startDate = df.parse("2018-08-13")
val endDate = df.parse("2018-09-14")
sc.parallelize(
rdd.select("custid", "date", "action")
.where("date >= ? and date < ?", startDate, endDate)
.keyBy(row => (
row.getInt("custid"),
df.format(row.getLong("date")),
row.getString("action"))).map { case (key, value) => (key, 1) }
.reduceByKey(_ + _).collect()
.map { case (key, value) => (key._1, key._2, key._3, value) })
.saveToCassandra("mykeyspace", "daily_cust_actions")
sc.stop()
}
}
我尝试了不同的方法,增加/减少内存/执行器,增加/减少spark.cassandra.input.split.size_in_mb
值和调整一些 spark 环境变量。但每次我得到一个不同的错误。它显示了2个阶段,第一阶段总是运行顺利,但在第二阶段总是失败。
我见过很多不同的错误。目前我收到以下错误:
2018-09-15 16:36:05 INFO TaskSetManager:54 - Task 158.1 in stage 1.1 (TID 1293) failed, but the task will not be re-executed (either because t
he task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already
succeeded).
2018-09-15 16:36:05 WARN TaskSetManager:66 - Lost task 131.1 in stage 1.1 (TID 1286, 127.0.0.1, executor 18): FetchFailed(null, shuffleId=0, m
apId=-1, reduceId=131, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
在这里的任何帮助将不胜感激。
解决方案
推荐阅读
- jsf - 如何使 CDI BeanManager 实例在 Wildfly 11.0 中可用?
- javascript - 在javascript的追加(输入)中使用onkeyup
- javascript - 如何处理不安全的 XMLHttpRequest 端点
- javascript - 如何从数据表中获取最后插入的行
- python-3.x - 由于未知原因,无法从网站上使用 python selenium binding 获取元素
- makefile - 避免 gnu make 自动删除文件
- jquery - jQuery多次触发
- excel - 来自不同工作表的 VBA 中的 Vlookup - 编码错误
- mysql - 让mysql查询更快
- android - 键盘关闭后Webview不会向下滚动