首页 > 解决方案 > 在火花中创建带有窗口的列

问题描述

我写信给你是因为我想创建两列:

data_i:使用第一个 spark 流窗口的“precio_actual”列的值

data_f:使用窗口的“precio_actual”列的最后一个值,

例如,现在我拥有的是以下内容:

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
|localSymbol|                time|precio_actual|    bid|    ask|  high|   low| close|             ventana|   string_real_time|   comienzo_ventana|        fin_ventana|PRUEBA|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
|    EUR.USD|2021-01-07 16:30:...|          1.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:00|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|
|    EUR.USD|2021-01-07 16:30:...|          3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:14|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|
|    EUR.USD|2021-01-07 16:30:...|          3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:15|2021-01-07 16:30:00|2021-01-07 16:30:30|   3.0|
|    EUR.USD|2021-01-07 16:30:...|          4.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:16|2021-01-07 16:30:00|2021-01-07 16:30:30|   4.0|
|    EUR.USD|2021-01-07 16:30:...|          2.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:29|2021-01-07 16:30:00|2021-01-07 16:30:30|   2.0|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+

我需要的是以下

+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
|localSymbol|                time|precio_actual|    bid|    ask|  high|   low| close|             ventana|   string_real_time|   comienzo_ventana|        fin_ventana|data_i|data_f|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
|    EUR.USD|2021-01-07 16:30:...|          1.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:00|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|   2.0|
|    EUR.USD|2021-01-07 16:30:...|          3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:14|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|   2.0|
|    EUR.USD|2021-01-07 16:30:...|          3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:15|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|   2.0|
|    EUR.USD|2021-01-07 16:30:...|          4.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:16|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|   2.0|
|    EUR.USD|2021-01-07 16:30:...|          2.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:29|2021-01-07 16:30:00|2021-01-07 16:30:30|   1.0|   2.0|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+

我的代码如下:

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":
    # Paso 1: Creacion de SparkSession
    spark = SparkSession \
        .builder \
        .appName("Spark IB") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 2) \
        .getOrCreate()

    # Paso 2: Leer un stream del kafka topic
    level_1_data_kafka_df = spark.readStream  \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "level_1_data_test") \
        .option("startingOffsets", "earliest") \
        .load()

    # Paso 3: Definicion de Schema
    level_1_data_schema = StructType([
        StructField("localSymbol", StringType()),
        StructField("time", StringType()),
        StructField("precio_actual", StringType()),
        StructField("bid", StringType()),
        StructField("ask", StringType()),
        StructField("high", StringType()),
        StructField("low", StringType()),
        StructField("close", StringType()),
        ])

    #Paso 4: Extraigo el campo "value" del kafka record
    """
    col("value") el campo "value" viene en formato binario
    .cast("string") Le hacemos un cast a string y ahora lo que tenemos es un string
    from_json sirve para deserializar un string a json, es decir, sirve para pasar de string a json con schema en particular
    .alias es para renombrar el schema, si hago level_1_data_value_df.printSchema() lo puedo ver
    """
    level_1_data_value_df = level_1_data_kafka_df.select(f.from_json(f.col("value").cast("string"), level_1_data_schema).alias("value"))

    # Paso 5: Transformacion de datos y ventana
    level_1_data_value_transform_df = level_1_data_value_df\
        .select(
            "value.*"
        ) \
        .withColumn("ventana", f.window(f.col("time"), "30 seconds")) \
        .withColumn("time", f.to_timestamp(f.col("time")) ) \
        .withColumn("string_real_time",f.col("time")[0:19]) \
        .withColumn("precio_actual", f.col("precio_actual").cast("double"))


    # Paso 6: Queries
    level_1_data_value_window_agg_df = level_1_data_value_transform_df\
        .select (
            f.col("*"),
        )\
        .withColumn("comienzo_ventana",f.col("ventana.start")) \
        .withColumn("fin_ventana", f.col("ventana.end"))  \
        .withColumn("PRUEBA", f.expr("case when comienzo_ventana < fin_ventana then precio_actual else 0 end")) \

    window_query = level_1_data_value_window_agg_df.writeStream \
        .format("console") \
        .outputMode("update") \
        .option("checkpointLocation", "chk-point-dir") \
        .trigger(processingTime="5 seconds") \
        .start()

    window_query.awaitTermination()

标签: pythonapache-sparkpyspark

解决方案


您可以使用firstlast超过列时间

df.withColumn("data_i", first("precia_actual").orderBy("time"))
  .withColumn("data_l", last("precia_actual").orderBy("time"))

或使用选择

df.select(*,
           first("precia_actual").orderBy("time").alias("data_i"),
           last("precia_actual").orderBy("time").alias("data_l"))

推荐阅读