首页 > 解决方案 > 为什么在通过 cassandra 连接的非常简单的数据帧上,spark 会进入无限循环进行以下操作?

问题描述

我在endless_loopkeyspace 中有一个非常简单的 cassandra 数据库表test。该表如下所示:

id
1
2

为什么我会得到一个连接sort-、filter-和-操作的代码的无限循环union

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._

object sparkUnderstandEndlessLoops {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("Endless Loop")
      .config("spark.master", "local")
      .config("spark.cassandra.connection.host", "localhost")
      .config("spark.cassandra.connection.port", "9042")
      .getOrCreate()
    val sc = spark.sparkContext

    val df = spark.read.cassandraFormat("endless_loop", "test").load()
    val df1 = df.sort("id")
    val df2 = df.filter(_=>false)
    val df3 = df1.union(df2)

    df3.show()
  }
}

我正在使用以下sbt文件:

scalaVersion := "2.12.11"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
    libraryDependencies += ("org.apache.cassandra" % "cassandra-all" % "3.11.6")
                              .exclude("net.jpountz.lz4", "lz4")
                              .exclude("ch.qos.logback", "logback-classic")
                              .exclude("ch.qos.logback", "logback-core")
                              .exclude("org.slf4j", "log4j-over-slf4j")
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.3"

编辑:

当最后几行更改为时,仍然会发生相同的错误

val df = spark.read.cassandraFormat("endless_loop", "test").load()
val df1 = df.sort("id")
val df2 = df1.union(df)

df2.show()

标签: dataframeapache-sparkcassandra

解决方案


推荐阅读