apache-spark - 计数操作不适用于聚合的 IgniteDataFrame
问题描述
我正在使用 Apache Spark 和 Apache Ignite。我有一个火花数据集,我使用以下代码在 Ignite 中编写
dataset.write()
.mode(SaveMode.Overwrite)
.format(FORMAT_IGNITE())
.option(OPTION_CONFIG_FILE(), "ignite-server-config.xml")
.option(OPTION_TABLE(), "CUSTOM_VALUES")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "ID")
.save();
我正在再次阅读它以执行分组操作,该操作将被推送到 Ignite。
Dataset igniteDataset = sparkSession.read()
.format(FORMAT_IGNITE())
.option(OPTION_CONFIG_FILE(), "ignite-server-config.xml")
.option(OPTION_TABLE(), "CUSTOM_VALUES")
.load();
RelationalGroupedDataset idGroupedData = igniteDataset.groupBy(customized_id);
Dataset<Row> result = idGroupedData.agg(count(id).as("count_id"),
count(fid).as("count_custom_field_id"),
count(type).as("count_customized_type"),
count(val).as("count_value"), count(customized_id).as("groupCount"));
现在,我想获取 groupby 操作返回的行数。所以,我在数据集上调用 count() 为result.count();
当我这样做时,我得到以下异常。
Caused by: org.h2.jdbc.JdbcSQLException: Syntax error in SQL statement "SELECT COUNT(1) AS COUNT FROM (SELECT FROM CUSTOM_VALUES GROUP[*] BY CUSTOMIZED_ID) TABLE1 "; expected "., (, USE, AS, RIGHT, LEFT, FULL, INNER, JOIN, CROSS, NATURAL, ,, SELECT"; SQL statement:
SELECT COUNT(1) AS count FROM (SELECT FROM CUSTOM_VALUES GROUP BY CUSTOMIZED_ID) table1 [42001-197]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:357)
at org.h2.message.DbException.getSyntaxError(DbException.java:217)
其他功能如show(), collectAsList().size();
作品。
我在这里想念什么?
解决方案
我针对 GridGain 的最后一个社区版本 8.7.5 测试了您的示例,该版本是 Gridgain 的开源版本,基于 Ignite 2.7.0 源和附加修复的子集 ( https://www.gridgain.com/resources/download )。
这是代码:
public class Main {
public static void main(String[] args) {
if (args.length < 1)
throw new IllegalArgumentException("You should set the path to client configuration file.");
String configPath = args[0];
SparkSession session = SparkSession.builder()
.enableHiveSupport()
.getOrCreate();
Dataset<Row> igniteDataset = session.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE()) //Data source
.option(IgniteDataFrameSettings.OPTION_TABLE(), "Person") //Table to read.
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), configPath) //Ignite config.
.load();
RelationalGroupedDataset idGroupedData = igniteDataset.groupBy("CITY_ID");
Dataset<Row> result = idGroupedData.agg(count("id").as("count_id"),
count("city_id").as("count_city_id"),
count("name").as("count_name"),
count("age").as("count_age"),
count("company").as("count_company"));
result.show();
session.close();
}
}
以下是maven依赖项:
<dependencies>
<dependency>
<groupId>org.gridgain</groupId>
<artifactId>gridgain-core</artifactId>
<version>8.7.5</version>
</dependency>
<dependency>
<groupId>org.gridgain</groupId>
<artifactId>ignite-core</artifactId>
<version>8.7.5</version>
</dependency>
<dependency>
<groupId>org.gridgain</groupId>
<artifactId>ignite-spring</artifactId>
<version>8.7.5</version>
</dependency>
<dependency>
<groupId>org.gridgain</groupId>
<artifactId>ignite-indexing</artifactId>
<version>8.7.5</version>
</dependency>
<dependency>
<groupId>org.gridgain</groupId>
<artifactId>ignite-spark</artifactId>
<version>8.7.5</version>
</dependency>
</dependencies>
这是缓存配置:
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="Person"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="sqlSchema" value="PUBLIC"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="PersonKey"/>
<property name="valueType" value="PersonValue"/>
<property name="tableName" value="Person"/>
<property name="keyFields">
<list>
<value>id</value>
<value>city_id</value>
</list>
</property>
<property name="fields">
<map>
<entry key="id" value="java.lang.Integer"/>
<entry key="city_id" value="java.lang.Integer"/>
<entry key="name" value="java.lang.String"/>
<entry key="age" value="java.lang.Integer"/>
<entry key="company" value="java.lang.String"/>
</map>
</property>
<property name="aliases">
<map>
<entry key="id" value="id"/>
<entry key="city_id" value="city_id"/>
<entry key="name" value="name"/>
<entry key="age" value="age"/>
<entry key="company" value="company"/>
</map>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
使用仅支持 ignite-spark 依赖项的 Spark 2.3.0,我的测试数据有下一个结果:
数据:
ID,CITY_ID,NAME,AGE,COMPANY,
4,1,Justin Bronte,23,bank,
3,1,Helen Richard,49,bank,
结果:
+-------+--------+-------------+----------+---------+-------------+
|CITY_ID|count_id|count_city_id|count_name|count_age|count_company|
+-------+--------+-------------+----------+---------+-------------+
| 1| 2| 2| 2| 2| 2|
+-------+--------+-------------+----------+---------+-------------+
此外,此代码可以完全应用于 Ignite 2.7.0。
推荐阅读
- regex - 如何在 bash 脚本中使用 Regex for Grep 选择图像 URL?
- ios - 如何在 UITextView 中显示数组中的文本,我可能只需要正确的字符串格式?
- c# - 访问空值导致应用程序失败/C#
- php - 1我尝试构建产品表单但记录无法插入数据库
- javascript - 我如何创建类别,例如如何?什么时候?为什么?对于某些视频会显示的大量文本?
- java - 如何使用'MOCKITO_CORE when'在junit测试用例中传递void返回类型
- python - 蛮力法算法python
- javascript - 如何找到 2 个动态创建的 div 类
- android - Retrofit/Gson 将 \n 添加到大字符串中
- javascript - 通过 AJAX 将值传递给控制器,无需使用 Thymeleaf 进行任何用户输入