java - 将 Elasticsearch JSON 响应转换为 Elasticsearch BulkResponse
问题描述
由于 Elasticsearch 正在从其 API 中移除 _type,所以我需要保持我们的产品在 ES 上跨不同版本的兼容性。
为了解决不支持_type的2.x、5.x和6.x及更高版本之间的冲突,我简单地统一了api,但对发送给ES的请求编写了不同的实现。对于支持 _type 的版本,我使用 ES 低休息客户端将原始 dsl 发送到 ES,而对于那些不支持的版本,我调用高休息客户端。不幸的是,当我实现 bulk() 时,我根本不知道应该如何处理 BulkResponse。
对于 2.x 和 5.x,我的方法返回是 JSONObject,实现如下:
/**
* @param bulkRequest requests to be sent
* @param type target type, for es 2.x and 5.x, this field is set by user
* @param refresh force es to refresh and create new lucene segment or not
* @param options
* @param headers
* @return
* @throws IOException
*/
@Override
public BulkResponse bulk(BulkRequest bulkRequest, String type, boolean refresh, RequestOptions options, Header... headers) throws IOException {
String endpoint = connection.getHttpSchema() + "://" + connection.getConnectedHosts()
+ ":" + connection.getConnectedPort() + "/_bulk";
HttpEntity entity = new NStringEntity(wrapBulkRequests(bulkRequest, type), ContentType.APPLICATION_JSON);
Request request = new Request(POST, endpoint);
request.setOptions(options);
request.setEntity(entity);
Response response = this.connection.getLowLevelClient().performRequest(request);
HttpEntity httpEntity = response.getEntity();
JSONObject res = JSON.parseObject(EntityUtils.toString(httpEntity, "utf-8"));
LOG.debug("bulk response is {}", res);
return new BulkResponse(res);
}
private String wrapBulkRequests(BulkRequest bulkRequest, String type) {
StringBuilder bulkRequestBody = new StringBuilder();
for (DocWriteRequest<?> request : bulkRequest.requests()) {
String requestStr = request.toString();
String targetIndex = request.index();
String _id = request.id();
String opType = request.opType().getLowercase();
if (INDEX_OP_TYPE.equals(opType)) {
if (_id == null || "".equals(_id)) {
bulkRequestBody.append(String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }\n", targetIndex, type));
} else {
bulkRequestBody.append(String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n", targetIndex, type, _id));
}
bulkRequestBody.append(matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]") + "\n");
} else if (CREATE_OP_TYPE.equals(opType)) {
if (_id == null || "".equals(_id)) {
bulkRequestBody.append(String.format("{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }\n", targetIndex, type));
} else {
bulkRequestBody.append(String.format("{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n", targetIndex, type, _id));
}
bulkRequestBody.append(matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]") + "\n");
} else if (UPDATE_OP_TYPE.equals(opType)) {
int retryOnConflict = ((UpdateRequest) request).retryOnConflict();
if (retryOnConflict == 0) {
bulkRequestBody.append(String.format("{ \"update\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n", targetIndex, type, _id));
} else {
bulkRequestBody.append(String.format("{ \"update\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\", \"retry_on_conflict\" : %d} }\n", targetIndex, type, _id, retryOnConflict));
}
boolean isUpsert = ((UpdateRequest) request).docAsUpsert();
bulkRequestBody.append(matchSourceFromUpdateRequest(requestStr, isUpsert));
} else if (DELETE_OP_TYPE.equals(opType)) { // delete ops
bulkRequestBody.append(String.format("{ \"delete\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n", targetIndex, type, _id));
} else {
LOG.error("bad bulk request op_type {}", opType);
}
}
LOG.debug("bulk request body is {}", bulkRequestBody.toString());
return bulkRequestBody.toString();
}
private String matchSourceFromUpdateRequest(String requestStr, boolean isUpsert) {
if (isUpsert) {
return "{ \"doc\" : " + matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]\\}\\]") + ", \"doc_as_upsert\" : true }\n";
} else {
return "{ \"doc\" : " + matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]\\}\\]") + " }\n";
}
}
private String matchSourceFromIndexRequest(String requestStr, String regex) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(requestStr);
if (m.find()) {
LOG.debug("index bulk request source is {}", m.group(1));
return m.group(1);
}
LOG.error("cannot match source from index bulk request, requestStr is {}", requestStr);
return "";
}
为了与 6.x 和 7.x 兼容,我需要返回一个BulkResponse
. 对于 , , 等情况GetResponse
,UpdateResponse
我IndexResponse
可以简单地编写自己的包装类来保存 JSONObject 或基于当前 es 版本的真实 ES 响应,因为上面常用的 api 返回类型的响应只是主要类型。但是对于 BulkResponse,它会返回很多嵌套类,例如BulkItemResponse
. 所以我想我可能必须在给定 JSONObject 响应的情况下构建 BulkResponse 。
我用谷歌搜索了很多,发现了一个类似的问题:How do you convert an Elasticsearch JSON String Response, with a Aggregation, to an Elasticsearch SearchResponse Object
This guy has similar request with me and his request is trying to convert json to SearchResponse while mine was试图将 json 转换为 BulkResponse. 我的想法是构建类似NamedXContentRegistry的东西。但是 ES 的这一部分有点像一个黑盒子,开发人员不容易暴露或看到。我搜索了很多文档、文章,但未能学会编写自己的 XContentRegistry。所以在这里我想请一个有扎实的 Elasticsearch 知识基础的人来帮助我摆脱这个烂摊子。
休谢谢!
解决方案
推荐阅读
- bioinformatics - 是否有任何 md 软件可以在具有 GPU 加速的单处理器上运行副本交换分子动力学(REMD)?
- vhdl - 如何使用这些规格生成 I²C 时钟?
- java - (Java) 如果其中一种情况包含 <、>、<= 或 >=,我如何进行切换
- flutter - 错误:const 列表文字中的值必须是常量。扑
- c++ - 使用谓词将类对象向量的子集创建为带有指针的向量?
- python - 更改 Matplotlib 条形图条形粗细 - 输出太小
- c# - 如何将一维数组转化为二维数组的具体规律?
- java - 将 Java 循环转换为 Kotlin
- c# - UWP 应用程序中是否可以使用“独立”浮动窗口?
- javascript - Javascript:如何计算总月数?