apache-spark - SparkContext:从关闭挂钩(Spark 和 Kubernetes)调用 stop()
问题描述
我正在尝试在 kubernetes 集群中运行 spark 和 ml-lib 代码。当我在客户端模式或在我构建的 docker 中运行相同的代码时,它可以正常工作。但是当我在 kubernetes 集群(PKS 平台)中运行相同的代码时,它没有错误地失败。
import os
import numpy as np
import pandas as pd
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler,VectorIndexer
from pyspark.sql.functions import broadcast
import datetime
##########sparkcontext###############
sc= SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
print("started")
###########RMSLE-calusulation########
def rmsle(real, predicted):
sum=0.0
for x in range(len(predicted)):
if predicted[x]<0 or real[x]<0: #check for negative values
continue
p = np.log(predicted[x]+1)
r = np.log(real[x]+1)
sum = sum + (p - r)**2
return (sum/len(predicted))**0.5
##########Reading of imput Data###########
customer = sqlContext.read.csv('hdfs://XXXXXX:/user/root/customer.csv', header=True, inferSchema = True)
customer =customer.select("*").toPandas()
lineitem = sqlContext.read.csv('hdfs://XXXXXXX:/user/root/lineitem.csv', header=True, inferSchema = True)
lineitem =lineitem.select("*").toPandas()
order = sqlContext.read.csv('hdfs://XXXXXXX:/user/root/orders.csv', header=True, inferSchema = True)
order =order.select("*").toPandas()
print("data has been read")
###########ETL############################
sales = order.join(customer, order.o_custkey == customer.c_custkey, how = 'inner')
sales = sales.sort_index()
sales.columns = ['key_old', 'o_orderdate', 'o_orderkey', 'o_custkey', 'o_orderpriority',
'o_shippriority', 'o_clerk', 'o_orderstatus', 'o_totalprice',
'o_comment', 'c_custkey', 'c_mktsegment', 'c_nationkey', 'c_name',
'c_address', 'c_phone', 'c_acctbal', 'c_comment']
sales2 = sales.join(lineitem,sales.o_orderkey == lineitem.l_orderkey, how = 'outer')
sales3 = sales2.groupby(by = 'o_orderdate')
sales4 = sales3.agg({'l_quantity': 'sum'})# .withColumnRenamed("sum(l_quantity)", "TOTAL_SALES") .withColumnRenamed("o_orderdate", "ORDERDATE")
print("End of ETL pipeline")
orderdates = pd.to_datetime(sales4.index.values)
orderdates = [datetime.datetime(i.year, i.month, i.day,) for i in orderdates]
l = []
l2 = []
for i in orderdates:
l = []
l.append(i.timestamp())
l.append(i.day)
l.append(i.timetuple().tm_wday)
l.append(i.timetuple().tm_yday)
l.append(i.isocalendar()[1])
l2.append(l)
print("dateconverted")
tmp = np.array(sales4.values)
tmp = tmp.reshape(tmp.shape[0],)
data_new = pd.DataFrame()
data_new['SALES'] = tmp
data_new[['DATE','DAY','WDAY','YDAY','WEEK']] = pd.DataFrame(np.array(l2))
data_new['ONES'] = np.ones((len(data_new)))
print("converted to datframe")
X = np.array(data_new[['DATE','DAY','WDAY','YDAY','WEEK','ONES']])
X = X.reshape(X.shape[0],X.shape[1])
Y = np.array(data_new[['SALES']])
Y = Y.reshape(Y.shape[0],1)
cutoff = 0.1
length = int((1-cutoff)*(len(X)))
X_train = X[0:length]
X_test = X[length:len(X)]
Y_train = Y[0:length]
Y_test = Y[length:len(Y)]
print("pre-processingdone")
weights = np.dot(np.dot(np.linalg.inv(np.dot(X_train.T,X_train)),X_train.T),Y_train)
print("model Ready")
Y_pred = np.dot(X_test,weights)
Y_pred = Y_pred.reshape(Y_pred.shape[0],)
Y_test = Y_test.reshape(Y_test.shape[0],)
print("predictions done")
RMSE = np.sqrt(np.mean((Y_test-Y_pred)**2))
RMSLE = rmsle(Y_test,Y_pred)
print(RMSE)
print(RMSLE
sc.stop()
环境
Kubernetes 集群:PKS 平台
有足够的内存 16GB 的主节点和 16GB 的 RAM 和 3 个工作节点
我没有看到任何峰值,并且在处理过程中几乎没有使用集群中 30% 的内存。
我的图像是 2GB,数据几乎没有 100MB。
它在以下代码中失败:
weights = np.dot(np.dot(np.linalg.inv(np.dot(X_train.T,X_train)),X_train.T),Y_train)
print("model Ready")
Y_pred = np.dot(X_test,weights)
Y_pred = Y_pred.reshape(Y_pred.shape[0],)
Y_test = Y_test.reshape(Y_test.shape[0],)
print("predictions done")
RMSE = np.sqrt(np.mean((Y_test-Y_pred)**2))
RMSLE = rmsle(Y_test,Y_pred)
print(RMSE)
print(RMSLE)
sc.stop()
以下是驱动程序日志的一部分,我在日志中看不到任何错误
19/12/26 10:52:23 INFO Context Cleaner: Cleaned accumulator 238
19/12/26 10:52:23 INFO ContextCleaner: Cleaned accumulator 222
19/12/26 10:52:23 INFO ContextCleaner: Cleaned accumulator 241
19/12/26 10:52:23 INFO ContextCleaner: Cleaned accumulator 228
data has been read
End of ETL pipeline
dateconverted
converted to datframe
pre-processingdone
19/12/26 10:52:35 INFO SparkContext: Invoking stop() from shutdown hook
19/12/26 10:52:35 INFO SparkUI: Stopped Spark web UI at http://spark-ml-test-4ff4386f41d48a9f-driver-svc.spark-jobs.svc:4040
19/12/26 10:52:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
19/12/26 10:52:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
19/12/26 10:52:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
19/12/26 10:52:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/12/26 10:52:35 INFO MemoryStore: MemoryStore cleared
19/12/26 10:52:35 INFO BlockManager: BlockManager stopped
19/12/26 10:52:35 INFO BlockManagerMaster: BlockManagerMaster stopped
19/12/26 10:52:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/12/26 10:52:35 INFO SparkContext: Successfully stopped SparkContext
19/12/26 10:52:35 INFO ShutdownHookManager: Shutdown hook called
19/12/26 10:52:35 INFO ShutdownHookManager: Deleting directory /var/data/spark-18e6167c-433f-41d8-82c4-b11ba9a3bf8c/spark-3a8d3e48-292c-4018-b003-bde80641eb90/pyspark-4b10b351-cf07-4452-8285-acb67157db80
19/12/26 10:52:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-29b813d2-f377-4d84-be3a-4372f69e58b5
19/12/26 10:52:35 INFO ShutdownHookManager: Deleting directory /var/data/spark-18e6167c-433f-41d8-82c4-b11ba9a3bf8c/spark-3a8d3e48-292c-4018-b003-bde80641eb90
对此有任何猜测或想法
解决方案
推荐阅读
- c++ - 运行小数字时超出 C++ 内存限制
- apache-flink - Flink 的故障恢复流程
- authentication - 在多个策略场景中设置默认授权策略
- python - Python 3:subprocess.run('mv') 使目标保持打开状态
- python - 基于类的视图中的 Django 嵌套模型
- python-3.x - 如何绘制一条表示具有两个几何列的数据框中的值的线?
- ios - 如何以正确的方向保存视频?
- vba - VBA Access 2010 DIR 导致空字符串
- python - 尽管在尝试登录网站时使用 POST 提供了所需的数据,但仍获得 404
- python - 如何在使用 keras 的网格搜索中正确实现 f1-score 作为评分指标?