首页 > 解决方案 > 将 Elasticsearch JSON 响应转换为 Elasticsearch BulkResponse

问题描述

由于 Elasticsearch 正在从其 API 中移除 _type,所以我需要保持我们的产品在 ES 上跨不同版本的兼容性。
为了解决不支持_type的2.x、5.x和6.x及更高版本之间的冲突,我简单地统一了api,但对发送给ES的请求编写了不同的实现。对于支持 _type 的版本,我使用 ES 低休息客户端将原始 dsl 发送到 ES,而对于那些不支持的版本,我调用高休息客户端。不幸的是,当我实现 bulk() 时,我根本不知道应该如何处理 BulkResponse。
对于 2.x 和 5.x,我的方法返回是 JSONObject,实现如下:

    /**
     * @param bulkRequest requests to be sent
     * @param type target type, for es 2.x and 5.x, this field is set by user
     * @param refresh force es to refresh and create new lucene segment or not
     * @param options
     * @param headers
     * @return
     * @throws IOException
     */
    @Override
    public BulkResponse bulk(BulkRequest bulkRequest, String type, boolean refresh, RequestOptions options, Header... headers) throws IOException {

        String endpoint = connection.getHttpSchema() + "://" + connection.getConnectedHosts()
                + ":" + connection.getConnectedPort() + "/_bulk";
        HttpEntity entity = new NStringEntity(wrapBulkRequests(bulkRequest, type), ContentType.APPLICATION_JSON);
        Request request = new Request(POST, endpoint);
        request.setOptions(options);
        request.setEntity(entity);
        Response response = this.connection.getLowLevelClient().performRequest(request);
        HttpEntity httpEntity = response.getEntity();
        JSONObject res = JSON.parseObject(EntityUtils.toString(httpEntity, "utf-8"));
        LOG.debug("bulk response is {}", res);
        return new BulkResponse(res);
    }

private String wrapBulkRequests(BulkRequest bulkRequest, String type) {
        StringBuilder bulkRequestBody = new StringBuilder();
        for (DocWriteRequest<?> request : bulkRequest.requests()) {
            String requestStr = request.toString();
            String targetIndex = request.index();
            String _id = request.id();
            String opType = request.opType().getLowercase();
            if (INDEX_OP_TYPE.equals(opType)) {
                if (_id == null || "".equals(_id)) {
                    bulkRequestBody.append(String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }\n", targetIndex, type));
                } else {
                    bulkRequestBody.append(String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
                }
                bulkRequestBody.append(matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]") + "\n");
            } else if (CREATE_OP_TYPE.equals(opType)) {
                if (_id == null || "".equals(_id)) {
                    bulkRequestBody.append(String.format("{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }\n", targetIndex, type));
                } else {
                    bulkRequestBody.append(String.format("{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
                }
                bulkRequestBody.append(matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]") + "\n");
            } else if (UPDATE_OP_TYPE.equals(opType)) {
                int retryOnConflict = ((UpdateRequest) request).retryOnConflict();
                if (retryOnConflict == 0) {
                    bulkRequestBody.append(String.format("{ \"update\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
                } else {
                    bulkRequestBody.append(String.format("{ \"update\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\",  \"retry_on_conflict\" : %d} }\n", targetIndex, type, _id, retryOnConflict));
                }
                boolean isUpsert = ((UpdateRequest) request).docAsUpsert();
                bulkRequestBody.append(matchSourceFromUpdateRequest(requestStr, isUpsert));
            } else if (DELETE_OP_TYPE.equals(opType)) { // delete ops
                bulkRequestBody.append(String.format("{ \"delete\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
            } else {
                LOG.error("bad bulk request op_type {}", opType);
            }
        }
        LOG.debug("bulk request body is {}", bulkRequestBody.toString());
        return bulkRequestBody.toString();
    }

    private String matchSourceFromUpdateRequest(String requestStr, boolean isUpsert) {
        if (isUpsert) {
            return "{ \"doc\" : " + matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]\\}\\]") + ", \"doc_as_upsert\" : true }\n";
        } else {
            return "{ \"doc\" : " + matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]\\}\\]") + " }\n";
        }
    }

    private String matchSourceFromIndexRequest(String requestStr, String regex) {
        Pattern p = Pattern.compile(regex);
        Matcher m = p.matcher(requestStr);
        if (m.find()) {
            LOG.debug("index bulk request source is {}", m.group(1));
            return m.group(1);
        }
        LOG.error("cannot match source from index bulk request, requestStr is {}", requestStr);
        return "";
    }

为了与 6.x 和 7.x 兼容,我需要返回一个BulkResponse. 对于 , , 等情况GetResponseUpdateResponseIndexResponse可以简单地编写自己的包装类来保存 JSONObject 或基于当前 es 版本的真实 ES 响应,因为上面常用的 api 返回类型的响应只是主要类型。但是对于 BulkResponse,它会返回很多嵌套类,例如BulkItemResponse. 所以我想我可能必须在给定 JSONObject 响应的情况下构建 BulkResponse 。
我用谷歌搜索了很多,发现了一个类似的问题:How do you convert an Elasticsearch JSON String Response, with a Aggregation, to an Elasticsearch SearchResponse Object
This guy has similar request with me and his request is trying to convert json to SearchResponse while mine was试图将 json 转换为 BulkResponse. 我的想法是构建类似NamedXContentRegistry的东西。但是 ES 的这一部分有点像一个黑盒子,开发人员不容易暴露或看到。我搜索了很多文档、文章,但未能学会编写自己的 XContentRegistry。所以在这里我想请一个有扎实的 Elasticsearch 知识基础的人来帮助我摆脱这个烂摊子。
休谢谢!

标签: javaelasticsearch

解决方案


推荐阅读