首页 > 解决方案 > SparkContext:从关闭挂钩(Spark 和 Kubernetes)调用 stop()

问题描述

我正在尝试在 kubernetes 集群中运行 spark 和 ml-lib 代码。当我在客户端模式或在我构建的 docker 中运行相同的代码时,它可以正常工作。但是当我在 kubernetes 集群(PKS 平台)中运行相同的代码时,它没有错误地失败。

    import os
    import numpy as np
    import pandas as pd
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import DoubleType
    from pyspark.sql.functions import lit, udf
    from pyspark.sql.functions import percent_rank
    from pyspark.sql import Window
    import pyspark.sql.functions as F
    from pyspark.ml import Pipeline
    from pyspark.ml.regression import GBTRegressor
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.feature import VectorAssembler,VectorIndexer
    from pyspark.sql.functions import broadcast   
    import datetime
    ##########sparkcontext###############
    sc= SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)
    print("started")
    ###########RMSLE-calusulation########
    def rmsle(real, predicted):
       sum=0.0
        for x in range(len(predicted)):
        if predicted[x]<0 or real[x]<0: #check for negative values
            continue
        p = np.log(predicted[x]+1)
        r = np.log(real[x]+1)
        sum = sum + (p - r)**2
        return (sum/len(predicted))**0.5
    ##########Reading of imput Data###########
    customer = sqlContext.read.csv('hdfs://XXXXXX:/user/root/customer.csv', header=True, inferSchema = True)
    customer =customer.select("*").toPandas()
    lineitem = sqlContext.read.csv('hdfs://XXXXXXX:/user/root/lineitem.csv', header=True, inferSchema = True)
    lineitem =lineitem.select("*").toPandas()
    order = sqlContext.read.csv('hdfs://XXXXXXX:/user/root/orders.csv', header=True, inferSchema = True)
    order =order.select("*").toPandas()
    print("data has been read")
    ###########ETL############################
    sales = order.join(customer, order.o_custkey == customer.c_custkey, how = 'inner')
    sales = sales.sort_index()
    sales.columns = ['key_old', 'o_orderdate', 'o_orderkey', 'o_custkey', 'o_orderpriority',
       'o_shippriority', 'o_clerk', 'o_orderstatus', 'o_totalprice',
       'o_comment', 'c_custkey', 'c_mktsegment', 'c_nationkey', 'c_name',
       'c_address', 'c_phone', 'c_acctbal', 'c_comment']
    sales2 = sales.join(lineitem,sales.o_orderkey == lineitem.l_orderkey, how = 'outer')
    sales3 = sales2.groupby(by = 'o_orderdate')
    sales4 = sales3.agg({'l_quantity': 'sum'})# .withColumnRenamed("sum(l_quantity)", "TOTAL_SALES") .withColumnRenamed("o_orderdate", "ORDERDATE")
    print("End of ETL pipeline")
    orderdates = pd.to_datetime(sales4.index.values)
    orderdates = [datetime.datetime(i.year, i.month, i.day,) for i in orderdates]
    l = []
    l2 = []
    for i in orderdates:
        l = []
        l.append(i.timestamp())
        l.append(i.day)
        l.append(i.timetuple().tm_wday)
        l.append(i.timetuple().tm_yday)
        l.append(i.isocalendar()[1])
        l2.append(l)
    print("dateconverted")
    tmp = np.array(sales4.values)
    tmp = tmp.reshape(tmp.shape[0],)
    data_new = pd.DataFrame()
    data_new['SALES'] = tmp
    data_new[['DATE','DAY','WDAY','YDAY','WEEK']] = pd.DataFrame(np.array(l2))
    data_new['ONES'] = np.ones((len(data_new)))
    print("converted to datframe")
    X = np.array(data_new[['DATE','DAY','WDAY','YDAY','WEEK','ONES']])
    X = X.reshape(X.shape[0],X.shape[1])
    Y = np.array(data_new[['SALES']])
    Y = Y.reshape(Y.shape[0],1)
    cutoff = 0.1
    length = int((1-cutoff)*(len(X)))
    X_train = X[0:length]
    X_test = X[length:len(X)]
    Y_train = Y[0:length]
    Y_test = Y[length:len(Y)]
    print("pre-processingdone")
    weights  = np.dot(np.dot(np.linalg.inv(np.dot(X_train.T,X_train)),X_train.T),Y_train)
    print("model Ready")
    Y_pred = np.dot(X_test,weights)
    Y_pred = Y_pred.reshape(Y_pred.shape[0],)
    Y_test = Y_test.reshape(Y_test.shape[0],)
    print("predictions done")
    RMSE = np.sqrt(np.mean((Y_test-Y_pred)**2))
    RMSLE = rmsle(Y_test,Y_pred)
    print(RMSE)
    print(RMSLE
    sc.stop()

环境

Kubernetes 集群:PKS 平台

有足够的内存 16GB 的主节点和 16GB 的 RAM 和 3 个工作节点

我没有看到任何峰值,并且在处理过程中几乎没有使用集群中 30% 的内存。

我的图像是 2GB,数据几乎没有 100MB。

它在以下代码中失败:

    weights  = np.dot(np.dot(np.linalg.inv(np.dot(X_train.T,X_train)),X_train.T),Y_train)
    print("model Ready")
    Y_pred = np.dot(X_test,weights)
    Y_pred = Y_pred.reshape(Y_pred.shape[0],)
    Y_test = Y_test.reshape(Y_test.shape[0],)
    print("predictions done")
    RMSE = np.sqrt(np.mean((Y_test-Y_pred)**2))
    RMSLE = rmsle(Y_test,Y_pred)
    print(RMSE)
    print(RMSLE)
    sc.stop()

以下是驱动程序日志的一部分,我在日志中看不到任何错误

19/12/26 10:52:23 INFO Context Cleaner: Cleaned accumulator 238
19/12/26 10:52:23 INFO ContextCleaner: Cleaned accumulator 222
19/12/26 10:52:23 INFO ContextCleaner: Cleaned accumulator 241
19/12/26 10:52:23 INFO ContextCleaner: Cleaned accumulator 228

data has been read
End of ETL pipeline
dateconverted
converted to datframe

pre-processingdone
19/12/26 10:52:35 INFO SparkContext: Invoking stop() from shutdown hook
19/12/26 10:52:35 INFO SparkUI: Stopped Spark web UI at http://spark-ml-test-4ff4386f41d48a9f-driver-svc.spark-jobs.svc:4040
19/12/26 10:52:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
19/12/26 10:52:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
19/12/26 10:52:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
19/12/26 10:52:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/12/26 10:52:35 INFO MemoryStore: MemoryStore cleared
19/12/26 10:52:35 INFO BlockManager: BlockManager stopped
19/12/26 10:52:35 INFO BlockManagerMaster: BlockManagerMaster stopped
19/12/26 10:52:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/12/26 10:52:35 INFO SparkContext: Successfully stopped SparkContext
19/12/26 10:52:35 INFO ShutdownHookManager: Shutdown hook called
19/12/26 10:52:35 INFO ShutdownHookManager: Deleting directory /var/data/spark-18e6167c-433f-41d8-82c4-b11ba9a3bf8c/spark-3a8d3e48-292c-4018-b003-bde80641eb90/pyspark-4b10b351-cf07-4452-8285-acb67157db80
19/12/26 10:52:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-29b813d2-f377-4d84-be3a-4372f69e58b5
19/12/26 10:52:35 INFO ShutdownHookManager: Deleting directory /var/data/spark-18e6167c-433f-41d8-82c4-b11ba9a3bf8c/spark-3a8d3e48-292c-4018-b003-bde80641eb90

对此有任何猜测或想法

标签: apache-spark

解决方案


推荐阅读