scala - 处理每个分区和每个分区中的每一行,一次一个
问题描述
问题:
我有以下 2 个数据帧存储在一个数组中。数据已按 SECURITY_ID 分区。
Dataframe 1 (DF1):
+-------------+----------+----------+--------+---------+---------+
| ACC_SECURITY|ACCOUNT_NO|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|
+-------------+----------+--------+---------+-----------+--------+
|9161530335G71| 91615303|1111 | 1000| 35G71| -20000|
|9161530435G71| 91615304|2222 | 2000| 35G71| -2883|
|9161530235G71| 91615302|3333 | 3000| 35G71| 2000|
|9211530135G71| 92115301|4444 | 4000| 35G71| 8003|
+-------------+----------+--------+---------+-----------+--------+
Dataframe 2 (DF2):
+-------------+----------+----------+--------+---------+---------+
| ACC_SECURITY|ACCOUNT_NO|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|
+-------------+----------+--------+---------+-----------+--------+
|3FA34789290X2| 3FA34789|5555 | 5000| 290X2| -20000|
|32934789290X2| 32934789|6666 | 6000| 290X2| -2883|
|00000019290X2| 00000019|7777 | 7000| 290X2| 2000|
|3S534789290X2| 3S534789|8888 | 8000| 290X2| 8003|
+-------------+----------+--------+---------+-----------+--------+
审判:
如何分别处理每个数据帧,在每个数据帧下,我想一次处理一行。我尝试了以下
def methodA(d1: DataFrame): Unit {
val securityIds = d1.select("SECURITY_ID").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => d1.where($"SECURITY_ID" <=> securityIds))
for(i <- 0 until bySecurityArray.length) {
allocOneDF = bySecurityArray(i).toDF()
print("Number of partitions: " + allocProcessDF.rdd.getNumPartitions)
methodB(allocProcessDF)
}
}
def methodA(d1: DataFrame): Unit {
import org.apache.spark.api.java.function.ForeachPartitionFunction
df.foreachPartition(ds => {
//Tried below while and also foreach... its same result.
//Option 1
while (ds.hasNext) {
allocProcess(ds.next())
}
//Option 2
ds.foreach(row => allocProcess(row))
})
}
我尝试处理 - 在来自 bySecurityArray 的每个 Dataframe 上使用 foreachpartition - 然后使用 foreach 处理结果数据集中的每一行(在 foreachpartition 之后)
但我只看到第一个数据帧(SECURITY_ID = 35G71)正在执行,而不是第二个数据帧(290X2)。
收到的错误:
19/09/23 08:57:38 ERROR util.Utils: Exception encountered
java.io.StreamCorruptedException: invalid type code: 30
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1371)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:376)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/09/23 08:57:38 ERROR util.Utils: Exception encountered
19/09/23 08:57:38 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 218.0 (TID 10452, CANTSHARE_URL, executor 6): java.io.StreamCorruptedException: invalid type code: 30
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
at java.lang.Thread.run(Thread.java:748)
19/09/23 08:57:38 INFO scheduler.DAGScheduler: ShuffleMapStage 218 (run at ThreadPoolExecutor.java:1149) failed in 0.120 s due to Job aborted due to stage failure: Task 9 in stage 218.0 failed 4 times, most recent failure: Lost task 9.3 in stage 218.0 (TID 10466, CANTSHARE_URL, executor 6): java.io.StreamCorruptedException: invalid type code: 30
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:376)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决方案
Spark 不保留顺序,因为数据分布在分区中,但仍然不能保证分区内的顺序,因为可能有多个任务。要获得一个逻辑顺序 coalesce(1) 后跟 sort(cols:*) 操作,可以在 Datafame 上应用以获取按指定列排序的新 Datafame/Dataset,所有这些都按升序排列。
def methodA(d1: DataFrame): Unit = {
val securityIds = d1.select("SECURITY_ID").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityId => d1.where(d1("SECURITY_ID") === securityId))
for (i <- 0 until bySecurityArray.length) {
val allocOneDF = bySecurityArray(i).toDF()
print("Number of partitions: " + allocOneDF.rdd.getNumPartitions)
methodB(allocOneDF)
}
}
def methodB(df: DataFrame): Unit = {
df.coalesce(1).sort("LONG_IND", "SHORT_IND").foreach(row => {
println(row)
//allocProcess(row)
})
}
推荐阅读
- python - 有没有办法将 Robot Framework 资源文件内容导入 Python 脚本?
- mongodb - 使用 Morphia 或 Mongo java 驱动程序检查查询是否匹配文档
- javascript - 从对象数组中创建对象的特定字段数组
- c - 文件指针是否导致代码中的分段错误
- java - AssertJ:递归否定 isEqualToComparingFieldByField
- mysql - 根据最早日期更新所有记录
- c# - 以编程方式创建没有源或目标的数据流任务
- javascript - WMS 图层未在所有缩放级别上正确渲染
- ruby-on-rails - root_url 返回空字符串
- laravel - Laravel 关系有很多不起作用