python - 在火花中创建带有窗口的列
问题描述
我写信给你是因为我想创建两列:
data_i:使用第一个 spark 流窗口的“precio_actual”列的值
data_f:使用窗口的“precio_actual”列的最后一个值,
例如,现在我拥有的是以下内容:
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
|localSymbol| time|precio_actual| bid| ask| high| low| close| ventana| string_real_time| comienzo_ventana| fin_ventana|PRUEBA|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
| EUR.USD|2021-01-07 16:30:...| 1.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:00|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:14|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:15|2021-01-07 16:30:00|2021-01-07 16:30:30| 3.0|
| EUR.USD|2021-01-07 16:30:...| 4.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:16|2021-01-07 16:30:00|2021-01-07 16:30:30| 4.0|
| EUR.USD|2021-01-07 16:30:...| 2.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:29|2021-01-07 16:30:00|2021-01-07 16:30:30| 2.0|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
我需要的是以下
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
|localSymbol| time|precio_actual| bid| ask| high| low| close| ventana| string_real_time| comienzo_ventana| fin_ventana|data_i|data_f|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
| EUR.USD|2021-01-07 16:30:...| 1.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:00|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:14|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:15|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 4.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:16|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 2.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:29|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
我的代码如下:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
if __name__ == "__main__":
# Paso 1: Creacion de SparkSession
spark = SparkSession \
.builder \
.appName("Spark IB") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# Paso 2: Leer un stream del kafka topic
level_1_data_kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "level_1_data_test") \
.option("startingOffsets", "earliest") \
.load()
# Paso 3: Definicion de Schema
level_1_data_schema = StructType([
StructField("localSymbol", StringType()),
StructField("time", StringType()),
StructField("precio_actual", StringType()),
StructField("bid", StringType()),
StructField("ask", StringType()),
StructField("high", StringType()),
StructField("low", StringType()),
StructField("close", StringType()),
])
#Paso 4: Extraigo el campo "value" del kafka record
"""
col("value") el campo "value" viene en formato binario
.cast("string") Le hacemos un cast a string y ahora lo que tenemos es un string
from_json sirve para deserializar un string a json, es decir, sirve para pasar de string a json con schema en particular
.alias es para renombrar el schema, si hago level_1_data_value_df.printSchema() lo puedo ver
"""
level_1_data_value_df = level_1_data_kafka_df.select(f.from_json(f.col("value").cast("string"), level_1_data_schema).alias("value"))
# Paso 5: Transformacion de datos y ventana
level_1_data_value_transform_df = level_1_data_value_df\
.select(
"value.*"
) \
.withColumn("ventana", f.window(f.col("time"), "30 seconds")) \
.withColumn("time", f.to_timestamp(f.col("time")) ) \
.withColumn("string_real_time",f.col("time")[0:19]) \
.withColumn("precio_actual", f.col("precio_actual").cast("double"))
# Paso 6: Queries
level_1_data_value_window_agg_df = level_1_data_value_transform_df\
.select (
f.col("*"),
)\
.withColumn("comienzo_ventana",f.col("ventana.start")) \
.withColumn("fin_ventana", f.col("ventana.end")) \
.withColumn("PRUEBA", f.expr("case when comienzo_ventana < fin_ventana then precio_actual else 0 end")) \
window_query = level_1_data_value_window_agg_df.writeStream \
.format("console") \
.outputMode("update") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="5 seconds") \
.start()
window_query.awaitTermination()
解决方案
您可以使用first
和last
超过列时间
df.withColumn("data_i", first("precia_actual").orderBy("time"))
.withColumn("data_l", last("precia_actual").orderBy("time"))
或使用选择
df.select(*,
first("precia_actual").orderBy("time").alias("data_i"),
last("precia_actual").orderBy("time").alias("data_l"))
推荐阅读
- python - Openvino movidius 无法读取 IP 摄像头
- angular - (几乎)每个组件中的导航栏
- java - 在java中的另一个类中实例化后的println语句没有打印出正确的值
- github - 在 GitHub 上分叉后无法保存到我的存储库
- r - 使用 colname 向量将特定的 numeric/int 转换为 R 中的因子
- awesome-wm - 令人敬畏的 wm 视觉异常
- html - CSS使兄弟姐妹的孩子在100%宽度的父母中具有相同的宽度和高度(正方形)
- bash - AWS CLI - 尝试将命令的输出复制到文本文件
- azure - 如何链接嵌套 ARM 模板中的资源?
- javascript - Material ui v4 withTheme() 不适用于 redux compose