python - 我们可以在数据帧的每一行上使用 spark sql 函数吗?
问题描述
我们可以在数据帧的每一行上使用 SQL 函数吗?例如:如果我们想在数据帧的行上应用 IF 内置 spark SQL 函数,有什么办法吗?
解决方案
Spark SQL,不包括 IF 的内置函数,但有一些方法可以使用条件运算符。示例代码显示了如何在 rdd 上映射并在每个值上应用 spark sql 条件运算符
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField, BooleanType
spark = SparkSession.builder.appName("Python Spark").getOrCreate()
data_sample = [[1, "r1", True], [2, "r2", False], [3, "r3", True], [4, "r4", False]]
data_schema = [StructField("id", IntegerType(), True), StructField("row", StringType(), True),
StructField("con", BooleanType(), True)]
data_frame = spark.createDataFrame(spark.sparkContext.parallelize(data_sample), StructType(data_schema))
result1 = data_frame.rdd.map(lambda r: r[0] > 1 and r[1] == "r2")
result2 = data_frame.rdd.map(lambda r: r[0] >= 1 and r[1] == "r2" and r[2])
print(result1.collect())
print(result2.collect())
输出
Picked up _JAVA_OPTIONS: -Xmx1024M
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[False, True, False, False]
[False, False, False, False]
推荐阅读
- javascript - 是否有可能将 async/await 与 map 一起使用,
- google-cloud-platform - 如何检查是否为 GCP 中每个密钥环中的每个密钥启用了密钥轮换?
- airflow - 使用微风无法使用气流2.0.0 构建 docker 映像
- pyspark - 使用 PySpark 读取带有多行字符串且不带引号的平面文件
- google-sheets - 使用 Google 表格上的 URL 选项卡的 ImportXML / ImportHTML 解决方法
- php - 我想显示较大的数字变量,但它没有在页面中显示任何内容
- google-sheets - 使用查询不工作触发 IMPORTRANGE
- python - 如何使用 pydroid3 在 jnius 中授予 android 权限
- qtmultimedia - 如何在 Python 的 QtMultimedia/QSound 中设置音量?
- php - PEAR 无法识别正确的 PHP 目录