首页 > 解决方案 > Pyspark 逐行循环

问题描述

findspark.init()
conf = SparkConf().setAppName("HyperparameterOptimization") \
    .setMaster("local[*]") \
    .set("spark.ui.port", "7077") \
    .set("spark.executor.memory", "1g") \
    .set("spark.driver.memory", "1g")

sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
h2oContext = H2OContext.getOrCreate()
numeric_demographic_cols=['CONTRACT_NO','PRICE']
non_numeric_demographic_cols=['TERM']
product_cols=['FL_ACTIVE_FLAG','FL_CITY']
train_cols = ['CHURN']
for target_column in train_cols:
    resultsPath = "Results\\"+target_column+"\\"
    os.makedirs(resultsPath,exist_ok=True)
    target = target_column
    try:
       data = spark.read.csv("churn_data.csv",sep=";",
                                       header=True)
    except:
        continue

for row in data.rdd.collect():
    currencyList = []
    currencyListt = []
    for row in data.rdd.collect():
        id = str(data['CONTRACT_NO'].values[row])
        currencyList.append(id)
        c = str(data['PRICE'].values[row]) + " " + str(
            data['TERM'].values[row]) + " " + str(
            data['FL_ACTIVE_FLAG'].values[row]) + " " + str(
            data['FL_CITY'].values[row]) + " " + str(
            data['CHURN'].values[row])

    currencyListt.append(c)
    de = {'col1': currencyList, 'col2': currencyListt}
    deneme = pd.DataFrame(data=de)
    dfcopy = deneme
    result_df = dfcopy.drop_duplicates(subset=['col2'], keep='last')
    d.write(str(result_df.shape))
    d.write(str(deneme.shape))
    d.close()

我有一个带有 spark 和 python 的架构。我使用了 pyspark 库。我写了我的连接信息。我有一个超参数优化系统。那里没有问题。现在我想做预测部分。业务部门会给我excel 1个或多个客户ID,我应该根据我的架构结果给他们结果。首先,我认为将excel保存到字典中。所以我创建了一本字典。我想尝试在数据集中逐行循环。并在我的字典中写下键 = 客户 ID 和值 = 其他功能。但我不能成功。当我检查数据的成分时,我看到了一个列表。[Row(PRICE='10', TERM='202102',.....] 我该如何处理?

标签: pythonpyspark

解决方案


推荐阅读