首页 > 解决方案 > 计数操作不适用于聚合的 IgniteDataFrame

问题描述

我正在使用 Apache Spark 和 Apache Ignite。我有一个火花数据集,我使用以下代码在 Ignite 中编写

dataset.write()
                .mode(SaveMode.Overwrite)
                .format(FORMAT_IGNITE())
                .option(OPTION_CONFIG_FILE(), "ignite-server-config.xml")
                .option(OPTION_TABLE(), "CUSTOM_VALUES")
                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "ID")
                .save();

我正在再次阅读它以执行分组操作,该操作将被推送到 Ignite。

  Dataset igniteDataset = sparkSession.read()
                .format(FORMAT_IGNITE())
                .option(OPTION_CONFIG_FILE(), "ignite-server-config.xml")
                .option(OPTION_TABLE(), "CUSTOM_VALUES")
                .load();


        RelationalGroupedDataset idGroupedData = igniteDataset.groupBy(customized_id);

        Dataset<Row> result = idGroupedData.agg(count(id).as("count_id"),
                count(fid).as("count_custom_field_id"),
                count(type).as("count_customized_type"),
                count(val).as("count_value"), count(customized_id).as("groupCount"));

现在,我想获取 groupby 操作返回的行数。所以,我在数据集上调用 count() 为result.count();

当我这样做时,我得到以下异常。

Caused by: org.h2.jdbc.JdbcSQLException: Syntax error in SQL statement "SELECT COUNT(1) AS COUNT FROM (SELECT  FROM CUSTOM_VALUES GROUP[*] BY CUSTOMIZED_ID) TABLE1 "; expected "., (, USE, AS, RIGHT, LEFT, FULL, INNER, JOIN, CROSS, NATURAL, ,, SELECT"; SQL statement:
SELECT COUNT(1) AS count FROM (SELECT  FROM CUSTOM_VALUES GROUP BY CUSTOMIZED_ID) table1 [42001-197]
    at org.h2.message.DbException.getJdbcSQLException(DbException.java:357)
    at org.h2.message.DbException.getSyntaxError(DbException.java:217)

其他功能如show(), collectAsList().size();作品。

我在这里想念什么?

标签: apache-sparkdataframeapache-spark-sqlignite

解决方案


我针对 GridGain 的最后一个社区版本 8.7.5 测试了您的示例,该版本是 Gridgain 的开源版本,基于 Ignite 2.7.0 源和附加修复的子集 ( https://www.gridgain.com/resources/download )。

这是代码:

public class Main {
    public static void main(String[] args) {
        if (args.length < 1)
            throw new IllegalArgumentException("You should set the path to client configuration file.");

        String configPath = args[0];

        SparkSession session = SparkSession.builder()
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> igniteDataset = session.read()
                .format(IgniteDataFrameSettings.FORMAT_IGNITE())                  //Data source
                .option(IgniteDataFrameSettings.OPTION_TABLE(), "Person")         //Table to read.
                .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), configPath) //Ignite config.
                .load();

        RelationalGroupedDataset idGroupedData = igniteDataset.groupBy("CITY_ID");

        Dataset<Row> result = idGroupedData.agg(count("id").as("count_id"),
                count("city_id").as("count_city_id"),
                count("name").as("count_name"),
                count("age").as("count_age"),
                count("company").as("count_company"));

        result.show();

        session.close();
    }
} 

以下是maven依赖项:

<dependencies>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>gridgain-core</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-core</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-spring</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-indexing</artifactId>
        <version>8.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.gridgain</groupId>
        <artifactId>ignite-spark</artifactId>
        <version>8.7.5</version>
    </dependency>
</dependencies>

这是缓存配置:

    <property name="cacheConfiguration">
        <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="Person"/>
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="atomicityMode" value="ATOMIC"/>
                <property name="sqlSchema" value="PUBLIC"/>

                <property name="queryEntities">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryEntity">
                            <property name="keyType" value="PersonKey"/>
                            <property name="valueType" value="PersonValue"/>
                            <property name="tableName" value="Person"/>

                            <property name="keyFields">
                                <list>
                                    <value>id</value>
                                    <value>city_id</value>
                                </list>
                            </property>

                            <property name="fields">
                                <map>
                                    <entry key="id" value="java.lang.Integer"/>
                                    <entry key="city_id" value="java.lang.Integer"/>
                                    <entry key="name" value="java.lang.String"/>
                                    <entry key="age" value="java.lang.Integer"/>
                                    <entry key="company" value="java.lang.String"/>
                                </map>
                            </property>

                            <property name="aliases">
                                <map>
                                    <entry key="id" value="id"/>
                                    <entry key="city_id" value="city_id"/>
                                    <entry key="name" value="name"/>
                                    <entry key="age" value="age"/>
                                    <entry key="company" value="company"/>
                                </map>
                            </property>
                        </bean>
                    </list>
                </property>
            </bean>
        </list>
    </property>

使用仅支持 ignite-spark 依赖项的 Spark 2.3.0,我的测试数据有下一个结果:

数据:

ID,CITY_ID,NAME,AGE,COMPANY,
4,1,Justin Bronte,23,bank,
3,1,Helen Richard,49,bank,

结果:

+-------+--------+-------------+----------+---------+-------------+
|CITY_ID|count_id|count_city_id|count_name|count_age|count_company|
+-------+--------+-------------+----------+---------+-------------+
|      1|       2|            2|         2|        2|            2|
+-------+--------+-------------+----------+---------+-------------+

此外,此代码可以完全应用于 Ignite 2.7.0。


推荐阅读