scala - 窗口规格/功能执行最佳方式或应首选任何替代方式
问题描述
我正在使用 spark-sql-2.4.1v。在我使用窗口规范/功能的用例中,使用该rank()
功能查找最新记录。我必须找到有关某些分区键的最新记录并按insertion_date
.
它非常慢。这个窗口规范rank()
可以在生产级代码中使用吗?或者有没有推荐的替代方法?专门提高性能。
请指教。
我目前正在使用以下代码:
Dataset<Row> data = sqlContext.read.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", hosts)
.options(Map( "table" -> "source_table", "keyspace" -> "calc")).load()
.where(col("error").equalTo(lit(200)))
.filter(col("insert_date").gt(lit("2015-01-01")))
.filter(col("insert_date").lt(lit("2016-01-01")))
.where(col("id").equalTo(lit(mId)))
解释计划
== Physical Plan ==
*(1) Project [cast(cast(unix_timestamp(insert_date#199, yyyy-MM-dd, Some(America/New_York)) as timestamp) as date) AS insert_date#500, 3301 AS id#399, create_date#201, company_id#202, ... 76 more fields]
+- *(1) Filter (((((((cast(error#263 as int) = 200) && (cast(insert_date#199 as string) >= 2018-01-01)) && (cast(insert_date#199 as string) <= 2018-01-01)) && isnotnull(id#200)) && isnotnull(insert_date#199)) && isnotnull(error#263)) && (id#200 = 3301))
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@261d7ee2 [... 76 more fields] PushedFilters: [IsNotNull(id), IsNotNull(insert_date), IsNotNull(error), EqualTo(id,3301)], ReadSchema: struct<...
阅读阅读
+-----------+------+
|partitionId| count|
+-----------+------+
| 1829| 29|
| 1959| 16684|
| 496| 3795|
| 2659| 524|
| 1591| 87|
| 2811| 2436|
| 2235| 620|
| 2563| 252|
| 1721| 12|
| 737| 1695|
| 858| 182|
| 2580| 73106|
| 3179| 694|
| 1460| 13|
| 1990| 66|
| 1522| 951|
| 540| 11|
| 1127|823084|
| 2999| 9|
| 623| 6629|
+-----------+------+
only showing top 20 rows
20/05/19 06:32:53 WARN ReaderCassandra: Processed started : 1589864993863 ended : 1589869973496 timetaken : 4979 s
val ws = Window.partitionBy("id").orderBy(desc("insert_date"),desc("update_date"));
Dataset<Row> ranked_data = data.withColumn("rank",rank().over(ws))
.where($"rank".===(lit(1)))
.select("*")
解决方案
推荐阅读
- mysql - MySQL - 多行或 JSON
- swift - 如何修复“函数声明了一个不透明的返回类型,但在其主体中没有返回语句可以从中推断出基础类型”错误?
- java - Eclipse 中的 Egit 错误:更新跟踪参考 refs/remotes/origin/5100_ 失败
: 失踪不详 - scala - 如何在火花数据框中查找具有所有值的重复列?
- django - 覆盖 Django 序列化程序中的字段
- arrays - Groovy 检查数组包含字符串与文字字符串和连接字符串的工作方式不同
- batch-file - 如何使用 bat 在命令提示符下执行 2 个命令目录行。文件
- python - sqlalchemy:防止关系对象自动添加到会话中
- elasticsearch - 如何在 elasticsearch 上运行自定义 lucene 编解码器?
- android - 如何使网络浏览器可以访问android应用程序