首页 > 解决方案 > Cassandra Spark 对列值执行数学运算并保存

问题描述

我正在做一个 Cassandra Spark 工作,我需要找到满足特定条件的特定用户,然后对特定列执行数学运算,然后将其保存到 cassandra

例如,我有以下数据集。当满足某些条件时,我想对年龄进行数学运算。

键空间:test_users成员

CREATE TABLE test_users.member (
    member_id bigint PRIMARY KEY,
    manually_entered boolean,
    member_age decimal,
    member_name text
)
 member_id | manually_entered | member_age | member_name
-----------+------------------+------------+------------------
         2 |            False |     25.544 |      Larry Smith
         3 |            False |    38.3214 |  Karen Dinglebop
         7 |             True |         10 |    Howard Jibble
         9 |             True |         10 |   Whitney Howard
         4 |             True |         60 |     Walter White
        10 |             True |         10 | Kevin Schmoggins
         8 |            False |     10.234 |     Brett Darrel
         5 |            False |      19.22 |    Kenny Loggins
         6 |             True |         10 |         Joe Dirt
         1 |            False |     56.232 |       Joe Schmoe

我试图弄清楚如何使用其中的列值来使用org.apache.spark.sql执行数学函数 round()

spark-shell  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Window
import spark.implicits._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.joda.time.LocalDate
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{round}
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SQLContext


val members = spark.
  read.
  format("org.apache.spark.sql.cassandra").
  options(Map( "table" -> "test_users", "keyspace" -> "member" )).
  load()

var member_birthdays = members.select("member_id", "manually_entered", "member_age").
  where("manually_entered = false and member_age % 1 <> 0").
  withColumn("member_age", round(members['member_age'] * 5)) 

member_birthdays.write.
  format("org.apache.spark.sql.cassandra").
  mode("Append").
  options(Map( "table" -> "test_users", "keyspace" -> "member")).
  save()

我无法弄清楚如何完成执行数学运算的任务,并round()用于更新在 spark cassandra 中满足条件的特定字段。

任何见解将不胜感激。

标签: apache-sparkcassandraapache-spark-sqlspark-cassandra-connector

解决方案


我更新了 org.apache.spark.sql.function 的导入并使用col('member_age')members['member_age']. 我成功地更新了列值并保存。

import org.apache.spark.sql.functions._

var member_birthdays = members.select("member_id", "manually_entered", "member_age").
  where("manually_entered = false and member_age % 1 <> 0").
  withColumn("member_age", round(col('member_age') * 5)) 

推荐阅读