python - Pyspark 逐行循环
问题描述
findspark.init()
conf = SparkConf().setAppName("HyperparameterOptimization") \
.setMaster("local[*]") \
.set("spark.ui.port", "7077") \
.set("spark.executor.memory", "1g") \
.set("spark.driver.memory", "1g")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
h2oContext = H2OContext.getOrCreate()
numeric_demographic_cols=['CONTRACT_NO','PRICE']
non_numeric_demographic_cols=['TERM']
product_cols=['FL_ACTIVE_FLAG','FL_CITY']
train_cols = ['CHURN']
for target_column in train_cols:
resultsPath = "Results\\"+target_column+"\\"
os.makedirs(resultsPath,exist_ok=True)
target = target_column
try:
data = spark.read.csv("churn_data.csv",sep=";",
header=True)
except:
continue
for row in data.rdd.collect():
currencyList = []
currencyListt = []
for row in data.rdd.collect():
id = str(data['CONTRACT_NO'].values[row])
currencyList.append(id)
c = str(data['PRICE'].values[row]) + " " + str(
data['TERM'].values[row]) + " " + str(
data['FL_ACTIVE_FLAG'].values[row]) + " " + str(
data['FL_CITY'].values[row]) + " " + str(
data['CHURN'].values[row])
currencyListt.append(c)
de = {'col1': currencyList, 'col2': currencyListt}
deneme = pd.DataFrame(data=de)
dfcopy = deneme
result_df = dfcopy.drop_duplicates(subset=['col2'], keep='last')
d.write(str(result_df.shape))
d.write(str(deneme.shape))
d.close()
我有一个带有 spark 和 python 的架构。我使用了 pyspark 库。我写了我的连接信息。我有一个超参数优化系统。那里没有问题。现在我想做预测部分。业务部门会给我excel 1个或多个客户ID,我应该根据我的架构结果给他们结果。首先,我认为将excel保存到字典中。所以我创建了一本字典。我想尝试在数据集中逐行循环。并在我的字典中写下键 = 客户 ID 和值 = 其他功能。但我不能成功。当我检查数据的成分时,我看到了一个列表。[Row(PRICE='10', TERM='202102',.....] 我该如何处理?
解决方案
推荐阅读
- ios - 有没有办法让 NEHotspotConfigurationManager 同意加入网络一次?
- python - 如何在程序中返回 mpiexec -n 参数?
- flask - 如何在烧瓶的另一条路线中使用一条路线中的变量?
- javascript - 如何将两个孩子喂给 React 组件并使用值?
- r - 根据列值对应减去嵌套在data.frame中的行
- java - Java 2D数组,无法找到将双重计算存储为元素的方法
- python - 你如何改变python中可以添加到整数的内容?
- python - 这是尝试代理的正确方法吗?
- node.js - 节点js mongodb查询
- python-3.x - 我在 python 中创建了 2 种不同的 2D 列表方式,然后用相同的方式更新,但我得到不同的结果