ignite - Apache Ignite + Spark Dataframes:客户端与服务器的疑问
问题描述
我一直在尝试整合 ignite 和 spark 。我的应用程序的目标是向/从 ignite 写入和读取 spark 数据帧。但是,我在使用较大的数据集(> 200 000 000 行)时遇到了几个问题。
我有一个在 YARN 上运行的 6 节点 Ignite 集群。它有 160Gb 的内存和 12 个内核。我正在尝试在 Ignite 缓存(分区 1 备份)中使用 spark(大约 20Gb 的原始文本数据)保存数据帧:
def main(args: Array[String]) {
val ignite = setupIgnite
closeAfter(ignite) { _ ⇒
implicit val spark: SparkSession = SparkSession.builder
.appName("Ignite Benchmark")
.getOrCreate()
val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")
writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)
}
}
在某些时候,spark 应用程序检索到此错误:
class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
^-- Enable eviction or expiration policies
at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2977)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:125)
at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1091)
at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
at java.lang.Thread.run(Thread.java:748)
我认为问题在于在 spark 会话之前启动了 ignite 服务器,就像在官方 ignite 示例中一样。该服务器开始缓存我正在写入 ignite 缓存的数据,并超过其默认区域大小最大值(12Gb,与我为 yarn 集群定义的 20GB 不同)。但是,我不明白示例和文档如何告诉我们在火花上下文(以及我假设的会话)之前创建一个点燃服务器。我知道,如果没有这个,一旦所有 spark 作业终止,应用程序就会挂起,但我不明白在 spark 应用程序上拥有一个服务器来开始缓存数据的逻辑。我对这个概念很困惑,现在我已经在 spark 中设置了这个 ignite 实例作为客户端。
这是一个奇怪的行为,因为我所有的 ignite 节点(在 YARN 上运行)都为默认区域定义了 20GB(我对其进行了更改并验证了它)。这表明错误必须来自在 Spark 上启动的 ignite 服务器(我认为它是在驱动程序上一个,每个工作人员一个),因为我没有更改 spark 应用程序的 ignite-config.xml 中的默认区域大小(默认为 12GB,如错误所示)。然而,这有意义吗?Spark 是否应该抛出这个错误作为它从/到 ignite 读取和写入数据的唯一目标?Spark 是否参与缓存任何数据,这是否意味着我应该在我的应用程序的 ignite-config.xml 中设置客户端模式,尽管官方示例没有使用客户端模式?
最好的问候,卡洛斯
解决方案
首先,Spark-Ignite 连接器已经以客户端模式连接。
我将假设您有足够的内存,但您可以按照容量规划指南中的示例来确定。
但是,我认为问题在于您对示例应用程序的关注过于紧密(!)。该示例——为了自成一体——包括服务器和 Spark 客户端。如果您已经拥有 Ignite 集群,则无需在 Spark 客户端中启动服务器。
这是一个来自真实应用程序的稍微简化的示例(在 Java 中,抱歉):
try (SparkSession spark = SparkSession
.builder()
.appName("AppName")
.master(sparkMaster)
.config("spark.executor.extraClassPath", igniteClassPath())
.getOrCreate()) {
// Get source DataFrame
DataSet<Row> results = ....
results.write()
.outputMode("append")
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), igniteCfgFile)
.option(IgniteDataFrameSettings.OPTION_TABLE(), "Results")
.option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE(), true)
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "name")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "backups=1")
.write();
}
我没有测试,但你应该明白了:你需要提供一个 Ignite 配置文件的 URL;它创建客户端以在幕后连接到该服务器。
推荐阅读
- scala - `java.lang.NoSuchMethodError:cats.FlatMap.map2`在运行时使用`.sequence`
- php - 更改语言时停留在当前页面
- django - 具有多字段的 Django 内联表单集
- node.js - npm:我将命令“npm ls -g -depth=0”输入错误为“npm ls -g -d=0”并且它已被执行
- pandas - 计算熊猫中的按行百分比
- python - Facebook/messenger 存档包含我无法解析的表情符号
- c - 为什么GCC初始化为0时不给静态变量赋值
- keras - keras.layers.concatenate 的输出形状
- c# - System.IO.FileNotFoundException 在 .net 框架项目中引用 .net 标准库时出错
- python - 如何通过打开文件来强制或更改我的终端以在 python 3 中运行 python 可执行文件?