python - Pyspark 将区间拆分为子区间
问题描述
我有一个包含 3 列“from”、“to”、“country”的数据框,例如:
from to country
1 105 abc
500 1000 def
我想通过将值拆分为大小= 10来创建数据框。所以我应该得到数据框
from to country
1 10 abc
11 20 abc
21 30 abc
31 40 abc
...
91 105 abc ( the left out values go in last bucket for that range)
500 510 def
等等...
解决方案
from pyspark.sql.functions import udf, col, explode, array, struct, length
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
#Creating the DataFrame
values = [(1,105,'abc'),(500,1000,'def')]
df = sqlContext.createDataFrame(values,['from','to','country'])
step_size=10
#Creating UDFs below
def make_list_from(start,end):
return [i for i in list(range(start, end, step_size)) if (end-i) >= (step_size-1)]
make_list_from_udf = udf(make_list_from,ArrayType(IntegerType()))
def make_list_to(start,end):
right_list=[i+step_size-1 for i in list(range(start, end, step_size)) if (end-i) >= (step_size-1)]
right_list[len(right_list)-1]=end
return right_list
make_list_to_udf = udf(make_list_to,ArrayType(IntegerType()))
#Creating Lists of sub-intervals
df = df.withColumn('my_list_from',make_list_from_udf(col('from'),col('to')))\
.withColumn('my_list_to',make_list_to_udf(col('from'),col('to')))\
.drop('from','to')
df.show()
+-------+--------------------+--------------------+
|country| my_list_from| my_list_to|
+-------+--------------------+--------------------+
| abc|[1, 11, 21, 31, 4...|[10, 20, 30, 40, ...|
| def|[500, 510, 520, 5...|[509, 519, 529, 5...|
+-------+--------------------+--------------------+
#Exploding the Lists
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
df = (df
.withColumn("tmp", zip_("my_list_from", "my_list_to"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select(col("tmp.first").alias("from"), col("tmp.second").alias("to"), "country"))
df.show(100)
+----+----+-------+
|from| to|country|
+----+----+-------+
| 1| 10| abc|
| 11| 20| abc|
| 21| 30| abc|
| 31| 40| abc|
| 41| 50| abc|
| 51| 60| abc|
| 61| 70| abc|
| 71| 80| abc|
| 81| 90| abc|
| 91| 105| abc|
| 500| 509| def|
| 510| 519| def|
| 520| 529| def|
.
.
.
| 960| 969| def|
| 970| 979| def|
| 980| 989| def|
| 990|1000| def|
+----+----+-------+
推荐阅读
- mysql - 我正在尝试连接 MySQl 和 Flask,但我不断收到错误消息。我正在为这个网络应用程序使用 XAMPP
- nivo-react - 控制 Nivo 条形图中标记线的宽度
- c++ - std::reference_wrapper 之间的最佳可行重载函数
和T - python-3.x - Pandas 使用 map 或 apply 使用字典从调整中创建新列
- google-slides - 在 Google 幻灯片中自动执行任务
- hdfs - 如何在最新的 Tensorflow 2.6.0 中访问 HDFS 文件系统?
- javascript - JS - 为什么我的“或”等式中的第二个值没有被识别
- r - 从 RMagick 列表中旋转特定图像
- java - 如何使用 Collectors.groupingBy 获得 HashMap 值
- javascript - Kotlin 多平台库依赖项中缺少 js.lower.serialization.ir.JsIrLinker$JsModuleDeserializer