apache-spark - Cassandra Spark 对列值执行数学运算并保存
问题描述
我正在做一个 Cassandra Spark 工作,我需要找到满足特定条件的特定用户,然后对特定列执行数学运算,然后将其保存到 cassandra
例如,我有以下数据集。当满足某些条件时,我想对年龄进行数学运算。
键空间:test_users表:成员
CREATE TABLE test_users.member (
member_id bigint PRIMARY KEY,
manually_entered boolean,
member_age decimal,
member_name text
)
member_id | manually_entered | member_age | member_name
-----------+------------------+------------+------------------
2 | False | 25.544 | Larry Smith
3 | False | 38.3214 | Karen Dinglebop
7 | True | 10 | Howard Jibble
9 | True | 10 | Whitney Howard
4 | True | 60 | Walter White
10 | True | 10 | Kevin Schmoggins
8 | False | 10.234 | Brett Darrel
5 | False | 19.22 | Kenny Loggins
6 | True | 10 | Joe Dirt
1 | False | 56.232 | Joe Schmoe
我试图弄清楚如何使用其中的列值来使用org.apache.spark.sql执行数学函数 round()
spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Window
import spark.implicits._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.joda.time.LocalDate
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{round}
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SQLContext
val members = spark.
read.
format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "test_users", "keyspace" -> "member" )).
load()
var member_birthdays = members.select("member_id", "manually_entered", "member_age").
where("manually_entered = false and member_age % 1 <> 0").
withColumn("member_age", round(members['member_age'] * 5))
member_birthdays.write.
format("org.apache.spark.sql.cassandra").
mode("Append").
options(Map( "table" -> "test_users", "keyspace" -> "member")).
save()
我无法弄清楚如何完成执行数学运算的任务,并round()
用于更新在 spark cassandra 中满足条件的特定字段。
任何见解将不胜感激。
解决方案
我更新了 org.apache.spark.sql.function 的导入并使用col('member_age')
了members['member_age']
. 我成功地更新了列值并保存。
import org.apache.spark.sql.functions._
var member_birthdays = members.select("member_id", "manually_entered", "member_age").
where("manually_entered = false and member_age % 1 <> 0").
withColumn("member_age", round(col('member_age') * 5))
推荐阅读
- mysql - 如何使用可选字段加载数据?
- go - 按时区(BST/GMT、CET/CEST 等)查找下一个夏令时日期
- ruby-on-rails - rails应用程序中多个用户的登台环境中的注销问题
- node.js - 在 XState FMS 中调用操作的正确方法是什么?
- microcontroller - 在 SAMDG55 上读取输出引脚电平
- mongodb - MongoDB:$and 运算符会影响查询性能吗?
- java - 尝试建立 WebSocket 连接时,JWT 身份验证阻止 SockJS 握手
- c++ - C++ 基类未定义
- python - 如何使 python 类具有不同的类?
- ios - 如何在没有响应的 iOS 上调试第三方应用程序?