首页 > 解决方案 > Spark Streaming 输出模式“内存”显示无记录

问题描述

我在 VS Code 编辑器中运行 spark 流代码,我使用内存作为我的数据接收器,并且输出模式已完成。

当我尝试从内存表中查看结果时,它显示无输出。

任何帮助,将不胜感激

import sys
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *

sparkSession = SparkSession.builder.master('local').appName('Max_Stock in complete mode').getOrCreate()

sparkSession.sparkContext.setLogLevel('ERROR')

schema = StructType([StructField('Date', StringType(), True),
                    StructField('Open', StringType(), True),
                    StructField('High', DoubleType(), True),
                    StructField('Low', StringType(), True),
                    StructField('Close', StringType(), True),
                    StructField('Adjusted Close', StringType(), True),
                    StructField('Volume', StringType(), True),
                    StructField('Name', StringType(), True)
                    ])

input_stream = """path"""
stockPricesDf = sparkSession.readStream.option('header', 'true').schema(schema).csv(input_stream)
print(' ')
print('Is the stream ready?')
print(stockPricesDf.isStreaming)


print(' ')
print('Schema of the input stream')
print(stockPricesDf.printSchema())

upDaysDf = stockPricesDf.select("Name", "Date","High", "Open", "Close").where("Open > Close") 
upDays_max = upDaysDf.groupBy('Name').max('High')               

query = upDays_max.writeStream.outputMode('complete').format('memory').queryName('datatable')\
            .option('truncate', 'false') \
            .option('numRows', 5) \
            .start() \
            .awaitTermination()
SparkSession.sql("select * from datatable ").show(5)

标签: apache-sparkpysparkapache-spark-sqlspark-streaming

解决方案


推荐阅读