首页 > 解决方案 > 如何使用 foreach 更新 pyspark 数据框

问题描述

我有一个 pyspark 数据框,我想根据某些逻辑处理每一行并更新/删除/插入行。我试图使用“foreach”和“foreachPartition”,但我无法真正弄清楚它将如何返回修改后的数据以更新实际的数据框

data = [

            {
                "city": "s",
                "latitude": "51",
                "longitude": "5",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "s",
                "latitude": "5",
                "longitude": "5.67",
                "region": "Europe",
                "date_range": "date_all_time",

            },
            {
                "city": "Aalborg",
                "latitude": "57.03",
                "longitude": "9.007",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "Aalborg",
                "latitude": "57.033",
                "longitude": "9.0007",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "Aalborg",
                "latitude": "57.0",
                "longitude": "9.97",
                "region": "Europe",
                "date_range": "date_last_year",

            },
            {
                "city": "Aarau",
                "latitude": "47.32",
                "longitude": "8.05",
                "region": "Europe",
                "date_range": "date_last_year",

            },    
]

from pyspark import SparkContext
from pyspark.sql import SQLContext, functions as sf

sc = SparkContext()
sqlContext = SQLContext(sc)

df = sc.parallelize(data).toDF()

def myfunction(row):
    if float(row.latitude) > 50:
        print('do_something')
        # need to access "df" to do some operations

df.foreach(myfunction)
df.show()

# output
do_something
do_something
do_something
do_something
+-------+--------------+--------+---------+------+                              
|   city|    date_range|latitude|longitude|region|
+-------+--------------+--------+---------+------+
|      s|date_last_year|      51|        5|Europe|
|      s| date_all_time|       5|     5.67|Europe|
|Aalborg|date_last_year|   57.03|    9.007|Europe|
|Aalborg|date_last_year|  57.033|   9.0007|Europe|
|Aalborg|date_last_year|    57.0|     9.97|Europe|
|  Aarau|date_last_year|   47.32|     8.05|Europe|
+-------+--------------+--------+---------+------+

我想要么将“df”传递给 foreach 函数,要么在 foreach 函数调用中返回并聚合它们。怎么做?

标签: pythondataframepyspark

解决方案


推荐阅读