首页 > 解决方案 > DataSourceReader 和 SupportPushDownFilters

问题描述

在我正在实现的自定义数据源阅读器中,我注意到我没有收到针对 ShortType、ByteType 和 BooleanType 等数据类型的下推过滤器。我确实获得了以下类型的过滤器:IntegerType、LongType、FloatType、DoubleType、DateType 和 TimestampType。

我通过 pushFilters 覆盖接收过滤器,这是 SupportsPushDownFilters 特征的一部分。

以下是演示该行为的会话。请注意,我放置

filters.foreach { println }

在 pushFilters 覆盖的顶部。

    Spark context Web UI available at http://192.168.20.114:4040
Spark context available as 'sc' (master = local[4], app id = local-1533638354996).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = spark.read.format("myds").
  option("port", 5000).
  option("host", "localhost").
  option("numPartitions", "1").
  option("function","spartanQuery").load()
df: org.apache.spark.sql.DataFrame = [col1: smallint, col2: bigint ... 1 more field]

scala> df.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(col1,ShortType,false), 
StructField(col2,LongType,false), StructField(col3,DoubleType,false))

scala> df.createOrReplaceTempView("dft")

scala> spark.sql("select * from dft").show
+----+----+-------------------+
|col1|col2|               col3|
+----+----+-------------------+
|   0|   0|0.36884380085393786|
|   1| 100| 0.5903338296338916|
|   2| 200|0.19292618241161108|
|   3| 300|0.27678317041136324|
|   4| 400|0.35814784304238856|
|   5| 500|0.19823945895768702|
|   6| 600| 0.4533605803735554|
|   7| 700|0.15147616644389927|
|   8| 800|0.09802114544436336|
|   9| 900|0.31370880361646414|
+----+----+-------------------+


scala> spark.sql("select * from dft where col1<5 and col2<500 and col3<.5").show
LessThan(col2,500)
LessThan(col3,0.5)
+----+----+-------------------+
|col1|col2|               col3|
+----+----+-------------------+
|   0|   0|0.36884380085393786|
|   2| 200|0.19292618241161108|
|   3| 300|0.27678317041136324|
|   4| 400|0.35814784304238856|
+----+----+-------------------+

注意 col1 (ShortType) 没有过滤器。这是 Spark 的缺点还是我做错了什么?

标签: apache-spark

解决方案


推荐阅读