首页 > 解决方案 > 将大数据批量索引到 Elasticsearch 时出现 OutOfMemoryError

问题描述

我正在尝试将大量数据索引到 Elasticsearch 中。数据来自一个大约 41 GB 的 CSV 文件,包含大约 1 亿行。我正在使用Elasticsearch Python 客户端来完成这项任务。代码看起来或多或少是这样的:

es = Elasticsearch(
    hosts=[es_host],
    http_auth=('username', 'password'),
    timeout=120,
    max_retries=10,
    retry_on_timeout=True
)

progress = tqdm.tqdm(unit='docs')
successes = 0

logger.info(f'Indexing {file_path}')
for ok, action in streaming_bulk(
        client=es,
        max_chunk_bytes=15 * 1024 * 1024,
        actions=bulk_data_generator(index_name=index, file_path=file_path)
):
    progress.update(1)
    successes += ok
logger.info('Success!')

bulk_data_generator是一个生成器,它逐行读取 CSV 并为 Elasticsearchbulk方法生成请求正文。

对于较小的 CSV 文件(大约 120 MB,大约 10 万行),代码可以正常工作。但是对于大文件,我得到了OutOfMemoryError. Elasticsearch 的日志包含有关垃圾收集器的一些信息,如下所示:

{"type": "server", "timestamp": "2021-10-13T15:02:56,234Z", "level": "INFO", "component": "o.e.i.b.HierarchyCircuitBreakerService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "attempting to trigger G1GC due to high heap usage [8391218448]",
"cluster.uuid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:02:56,405Z", "level": "INFO", "component": "o.e.i.b.HierarchyCircuitBreakerService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "GC did not bring memory usage down, before [8391218448], after [8
401704208], allocations [42], duration [171]", "cluster.uuid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:02:58,158Z", "level": "INFO", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "[gc][16553] overhead, spent [265ms] collecting in the last [1s]", "cluster.u
uid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:03:03,161Z", "level": "INFO", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "[gc][16558] overhead, spent [291ms] collecting in the last [1s]", "cluster.u
uid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:03:04,250Z", "level": "INFO", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "[gc][16559] overhead, spent [346ms] collecting in the last [1s]", "cluster.u
uid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:03:11,420Z", "level": "INFO", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "[gc][16566] overhead, spent [325ms] collecting in the last [1s]", "cluster.u
uid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:03:17,432Z", "level": "WARN", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "[gc][16572] overhead, spent [531ms] collecting in the last [1s]", "cluster.u
uid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }
{"type": "server", "timestamp": "2021-10-13T15:03:22,481Z", "level": "INFO", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "[gc][16577] overhead, spent [369ms] collecting in the last [1s]", "cluster.u
uid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A"  }

然后,异常如下所示:

{"type": "server", "timestamp": "2021-10-13T15:04:21,079Z", "level": "ERROR", "component": "o.e.b.ElasticsearchUncaughtExceptionHandler", "cluster.name": "docker-cluster", "node.name": "7476779d6cca", "message": "fatal error in thread [elasticsearch[7476779d6cca][write][T
#1]], exiting", "cluster.uuid": "xMozIZtHRCS86sXTrngOpA", "node.id": "uGpM5oqjSgKEwVHnw6mH0A" ,
"stacktrace": ["java.lang.OutOfMemoryError: Java heap space",
"at java.util.Arrays.copyOf(Arrays.java:3536) ~[?:?]",
"at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:100) ~[?:?]",
"at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:130) ~[?:?]",
"at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137) ~[jackson-core-2.10.4.jar:2.10.4]",
"at com.fasterxml.jackson.core.json.UTF8JsonGenerator.writeString(UTF8JsonGenerator.java:506) ~[jackson-core-2.10.4.jar:2.10.4]",
"at org.elasticsearch.common.xcontent.json.JsonXContentGenerator.writeString(JsonXContentGenerator.java:271) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.value(XContentBuilder.java:654) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.lambda$static$14(XContentBuilder.java:95) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder$$Lambda$50/0x0000000800c2f100.write(Unknown Source) ~[?:?]",
"at org.elasticsearch.common.xcontent.XContentBuilder.unknownValue(XContentBuilder.java:811) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.map(XContentBuilder.java:891) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.unknownValue(XContentBuilder.java:818) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.value(XContentBuilder.java:920) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.unknownValue(XContentBuilder.java:820) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.map(XContentBuilder.java:891) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.xcontent.XContentBuilder.map(XContentBuilder.java:866) ~[elasticsearch-x-content-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:443) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.update.UpdateHelper.prepareUpdateScriptRequest(UpdateHelper.java:233) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:82) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:63) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:220) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:158) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:203) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:109) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:74) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.action.support.replication.TransportWriteAction$1.doRun(TransportWriteAction.java:172) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:732) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) ~[elasticsearch-7.14.0.jar:7.14.0]",
"at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]",
"at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]",
"at java.lang.Thread.run(Thread.java:831) [?:?]"] }

我在 Docker 容器中运行 Elasticsearch 7.14 版。这是docker-compose.yml文件:

version: "3.7"
services:
  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.14.0
    container_name: es01
    restart: always
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms8g -Xmx8g"
      - TAKE_FILE_OWNERSHIP=true
      - xpack.security.enabled=true
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks:
      - elastic

  kib01:
    image: docker.elastic.co/kibana/kibana:7.14.0
    container_name: kib01
    restart: always
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_URL: http://es01:9200
      ELASTICSEARCH_HOSTS: '["http://es01:9200"]'
      SERVER_PUBLICBASEURL: http://example.com:5601
      ELASTICSEARCH_USERNAME: kibana_system
      ELASTICSEARCH_PASSWORD: "${KIBANA_SYSTEM_PASSWORD}"
    networks:
      - elastic

networks:
  elastic:
    name: elastic

volumes:
  data01:

我知道一种解决方案是增加资源(更大的堆大小,添加更多节点)。但首先我想了解为什么会发生这种OutOfMemoryError情况才能找到最佳解决方案。

我每个请求最多只发送 15 MB,而 Elasticsearch 有 8 GB 堆大小。我在这里做错了什么?

标签: pythondockerelasticsearchelasticsearch-py

解决方案


推荐阅读