首页 > 解决方案 > 将 JSON 转换为 JSON 对象以触发数据帧

问题描述

这是json数据的格式

{0: {'feature0': 4, 'feature1': "is waiting for Izzy to get out of class, then it's off to Izzy's banquet and Old Town Pasadena with the sister "}, 1: {'feature0': 0, 'feature1': "i feel like I've been hit by a truck and I really cannot concentrate on my work right now  mind is going round in circles ugh"}, 2: {'feature0': 4, 'feature1': "Gwen Stefani's kids are addorable!!! They where there and watching from the side  There mom is fucking amazing."}, 3: {'feature0': 4, 'feature1': 'watching , im a celebrity , get me outta here with sam '}, 4: {'feature0': 0, 'feature1': 'cleaning my room '}, 5: {'feature0': 0, 'feature1': 'had a super bad fight with a best friend '}

此数据正在使用stream.py文件进行流式传输。这是客户端文件。我需要使用 spark 将 json 数据转换为数据帧。

这是我编写的代码。我正在尝试使用json.loads,但它不起作用。这是情感数据集的链接-https://drive.google.com/drive/folders/1bMIVnuBkm3UnHJi6gMp5QHAH93u1UlJQ stream.py 文件应该在一个终端中运行,客户端文件在另一个终端中运行。这些是预期的输出。但是当我运行我的客户端文件时,我没有得到任何输出。客户端文件是使用 spark 命令运行的。 在此处输入图像描述

在此处输入图像描述

这是 stream.py 文件

#! /usr/bin/python3

import time
import json
import pickle
import socket
import argparse
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

# Run using python3 stream.py to use CIFAR dataset and default batch_size as 100
# Run using python3 stream.py -f <input_file> -b <batch_size> to use a custom file/dataset and batch size
# Run using python3 stream.py -e True to stream endlessly in a loop
parser = argparse.ArgumentParser(
    description='Streams a file to a Spark Streaming Context')
parser.add_argument('--file', '-f', help='File to stream', required=False,
                    type=str, default="cifar")    # path to file for streaming
parser.add_argument('--batch-size', '-b', help='Batch size',
                    required=False, type=int, default=100)  # default batch_size is 100
parser.add_argument('--endless', '-e', help='Enable endless stream',
                    required=False, type=bool, default=False)  # looping disabled by default

TCP_IP = "localhost"
TCP_PORT = 6100


def connectTCP():   # connect to the TCP server -- there is no need to modify this function
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((TCP_IP, TCP_PORT))
    s.listen(1)
    print(f"Waiting for connection on port {TCP_PORT}...")
    connection, address = s.accept()
    print(f"Connected to {address}")
    return connection, address


# separate function to stream CIFAR batches since the format is different
def sendCIFARBatchFileToSpark(tcp_connection, input_batch_file):
    # load the entire dataset
    with open(f'cifar/{input_batch_file}', 'rb') as batch_file:
        batch_data = pickle.load(batch_file, encoding='bytes')

    # obtain image data and labels
    data = batch_data[b'data']
    data = list(map(np.ndarray.tolist, data))
    labels = batch_data[b'labels']
    # setting feature size to form the payload later
    feature_size = len(data[0])
    # iterate over batches of size batch_size
    for image_index in tqdm(range(0, len(data)-batch_size+1, batch_size)):
        # load batch of images
        image_data_batch = data[image_index:image_index+batch_size]
        image_label = labels[image_index:image_index +
                             batch_size]        # load batch of labels
        payload = dict()
        for mini_batch_index in range(len(image_data_batch)):
            payload[mini_batch_index] = dict()
            for feature_index in range(feature_size):  # iterate over features
                payload[mini_batch_index][f'feature{feature_index}'] = image_data_batch[mini_batch_index][feature_index]
            payload[mini_batch_index]['label'] = image_label[mini_batch_index]
        # print(payload)    # uncomment to see the payload being sent
        # encode the payload and add a newline character (do not forget the newline in your dataset)
        send_batch = (json.dumps(payload) + '\n').encode()
        try:
            tcp_connection.send(send_batch)  # send the payload to Spark
        except BrokenPipeError:
            print("Either batch size is too big for the dataset or the connection was closed")
        except Exception as error_message:
            print(f"Exception thrown but was handled: {error_message}")
        time.sleep(1)


def streamCIFARDataset(tcp_connection, dataset_type='cifar'):
    print("Starting to stream CIFAR data")
    CIFAR_BATCHES = [
        'data_batch_1',
        # 'data_batch_2',   # uncomment to stream the second training dataset
        # 'data_batch_3',   # uncomment to stream the third training dataset
        # 'data_batch_4',   # uncomment to stream the fourth training dataset
        # 'data_batch_5',    # uncomment to stream the fifth training dataset
        # 'test_batch'      # uncomment to stream the test dataset
    ]
    for batch in CIFAR_BATCHES:
        sendCIFARBatchFileToSpark(tcp_connection, batch)
        time.sleep(1)


def sendPokemonBatchFileToSpark(tcp_connection, input_batch_file):
    # load the entire dataset
    with open(f'pokemon/{input_batch_file}.pickle', 'rb') as batch_file:
        batch_data = pickle.load(batch_file)
    
    # obtain image data and labels
    data = batch_data['img']
    labels = batch_data['label']
    # iterate over batches of size batch_size
    for image_index in tqdm(range(0, len(data)-batch_size+1, batch_size)):
        # load batch of images
        image_data_batch = data[image_index:image_index+batch_size]
        image_label = labels[image_index:image_index +
                             batch_size]        # load batch of labels
        payload = dict()    
        for mini_batch_index in range(len(image_data_batch)):   
            payload[mini_batch_index] = dict()  
            payload[mini_batch_index]["img"] = image_data_batch[mini_batch_index]
            # if you want to flatten out the matrix, use payload[mini_batch_index]["img"] = np.asarray(image_data_batch[mini_batch_index]).flatten().tolist()
            payload[mini_batch_index]['label'] = image_label[mini_batch_index]
        # print(payload)    # uncomment to see the payload being sent
        # encode the payload and add a newline character (do not forget the newline in your dataset)
        send_batch = (json.dumps(payload) + '\n').encode()
        try:
            tcp_connection.send(send_batch)  # send the payload to Spark
        except BrokenPipeError:
            print("Either batch size is too big for the dataset or the connection was closed")
        except Exception as error_message:
            print(f"Exception thrown but was handled: {error_message}")
        time.sleep(1)
            

def streamPokemonDataset(tcp_connection, dataset_type='pokemon'):
    print("Starting to stream Pokemon data")
    POKEMON_BATCHES = [
        'train_batch_1',
        # 'train_batch_2',   # uncomment to stream the second training dataset
        # 'train_batch_3',   # uncomment to stream the third training dataset
        # 'train_batch_4',   # uncomment to stream the fourth training dataset
        # 'train_batch_5',    # uncomment to stream the fifth training dataset
        # 'test_batch'      # uncomment to stream the test dataset
    ]
    for batch in POKEMON_BATCHES:
        sendPokemonBatchFileToSpark(tcp_connection, batch)
        time.sleep(1)


def streamDataset(tcp_connection, dataset_type):    # function to stream a dataset
    # this is the function you need to recreate to work with custom datasets
    # if your dataset has multiple files (train, test, etc), modify and use this function to stream your dataset
    print(f"Starting to stream {dataset_type} dataset")
    DATASETS = [    # list of files in your dataset to stream
        "train",
        # "test"    # uncomment to stream the test dataset
    ]
    for dataset in DATASETS:
        streamCSVFile(tcp_connection, f'{dataset_type}/{dataset}.csv')
        time.sleep(1)


def streamCSVFile(tcp_connection, input_file):    # stream a CSV file to Spark
    '''
    Each batch is streamed as a JSON file and has the following shape. 
    The outer indices are the indices of each row in a batch and go from 0 - batch_size-1
    The inner indices are the indices of each column in a row and go from 0 - feature_size-1

    {
        '0':{
            'feature0': <value>,
            'feature1': <value>,
            ...
            'featureN': <value>,
        }
        '1':{
            'feature0': <value>,
            'feature1': <value>,
            ...
            'featureN': <value>,
        }
        ...
        'batch_size-1':{
            'feature0': <value>,
            'feature1': <value>,
            ...
            'featureN': <value>,
        }
    }
    '''

    df = pd.read_csv(input_file)  # load the entire dataset
    values = df.values.tolist()  # obtain the values of the dataset
    # loop through batches of size batch_size lines
    for i in tqdm(range(0, len(values)-batch_size+1, batch_size)):
        send_data = values[i:i+batch_size]  # load batch of rows
        payload = dict()    # create a payload
        # iterate over the batch
        for mini_batch_index in range(len(send_data)):
            payload[mini_batch_index] = dict()  # create a record
            # iterate over the features
            for feature_index in range(len(send_data[0])):
                # add the feature to the record
                payload[mini_batch_index][f'feature{feature_index}'] = send_data[mini_batch_index][feature_index]
        # print(payload)    # uncomment to see the payload being sent
        # encode the payload and add a newline character (do not forget the newline in your dataset)
        send_batch = (json.dumps(payload) + '\n').encode()
        try:
            tcp_connection.send(send_batch)  # send the payload to Spark
        except BrokenPipeError:  # this indicates that the message length of the payload is more than what is allowed via TCP
            print("Either batch size is too big for the dataset or the connection was closed")
        except Exception as error_message:
            print(f"Exception thrown but was handled: {error_message}")
        time.sleep(1)


def streamFile(tcp_connection, input_file):  # stream a newline delimited file to Spark
    '''
    Each batch is streamed as newline delimited text and has the following shape.

    line1\n
    line2\n
    ...
    '''
    with open(input_file, 'r') as file:
        data = file.readlines()  # open the file and read every line
        total_lines = len(data)
        # loop through batches of size batch_size lines
        for i in tqdm(range(0, total_lines-batch_size+1, batch_size)):
            send_data = data[i:i+batch_size]    # load batch of lines
            # encode the payload and add a newline character (do not forget the newline in your dataset)
            send_batch = (json.dumps(send_data) + '\n').encode()
            try:
                tcp_connection.send(send_batch)  # send the payload to Spark
            except BrokenPipeError:
                print("Either batch size is too big for the dataset or the connection was closed")
            except Exception as error_message:
                print(f"Exception thrown but was handled: {error_message}")
            time.sleep(1)


if __name__ == '__main__':
    args = parser.parse_args()
    print(args)

    input_file = args.file
    batch_size = args.batch_size
    endless = args.endless

    tcp_connection, _ = connectTCP()

    # to stream a custom dataset, uncomment the elif block and create your own dataset streamer function (or modify the existing one)
    if input_file == "cifar":
        _function = streamCIFARDataset
    elif input_file == "pokemon":
        _function = streamPokemonDataset
    elif input_file in ["crime", "sentiment", "spam"]:
        _function = streamDataset
    # elif input_file == "my dataset":
    #     _function = streamMyDataset
    else:
        _function = streamFile

    if endless:
        while True:
            _function(tcp_connection, input_file)
    else:
        _function(tcp_connection, input_file)

# Setup your own dataset streamer by following the examples above.
# If you wish to stream a single newline delimited file, use streamFile()
# If you wish to stream a CSV file, use streamCSVFile()
# If you wish to stream any other type of file(JSON, XML, etc.), write an appropriate function to load and stream the file

这是客户端文件

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql import SparkSession,Row,Column
import json
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
import sys

sc = SparkContext("local[2]", "NetworkWordCount")
spark = SparkSession(sc)
ssc = StreamingContext(sc, 1)
sql_context=SQLContext(sc)

lines = ssc.socketTextStream("localhost", 6100)

def get_prediction(tweet_text):
    try:
                # remove the blank tweets
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
                # create the dataframe with each row contains a tweet text
        print(tweet_text)
        rowRdd = tweet_text.map(lambda w:json.loads(w))
        #print(rowRdd)
        #rowRdd2 = tweet_text.map(lambda w: Row(tweet=w))
        wordsDataFrame = spark.createDataFrame(rowRdd2)
        # get the sentiments for each row
        print(wordsDataFrame)
        #pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
    except : 
        print('No data')

    # define the schema
my_schema = tp.StructType([
                tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),                   
                    tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)    
                      ])        
        
#print('\n\nReading the dataset...........................\n')
#my_data = spark.read.csv('/home/pes2ug19cs013/Desktop/project/sentiment/test.csv', schema=my_schema, header=True)
#my_data.show(2)

#my_data.printSchema()

#print(lines)

words = lines.flatMap(lambda line : line.split(" "))
words.foreachRDD(get_prediction)





 

标签: pythonjsonapache-sparkpysparkspark-structured-streaming

解决方案


推荐阅读