java - 用于在 ElasticSearch 中索引文档的 Java ThreadPoolExecutor
问题描述
我是 Java 新手ThreaPoolExecutor
,我编写了一些任务来索引弹性搜索中的文档。通过ThreaPoolExecutor
我正在执行该任务并且它工作正常。
但是,仍然不太确定我的方法。
请在下面找到我的代码
public class IndexApp {
public static void main(String[] args)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
Map<String, Object> jsonMap ;
System.out.println("Indexing via Java Code ....");
Product prod1=new Product("1001", 123172l, "Product", "VG3000");
Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");
List<Product> objList=new ArrayList<Product>();
objList.add(prod1);
objList.add(prod2);
objList.add(prod3);
objList.add(prod4);
for(int i=0;i<objList.size();i++)
{
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", objList.get(i).getId());
jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
jsonMap.put("values", objList.get(i).getValues());
IndexTask task = new IndexTask(jsonMap);
executor.execute(task);
}
executor.shutdown();
}
}
public class IndexTask implements Runnable {
private final static String INDEX_NAME = "index_prod";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
public IndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
任何人都让我知道我的方法对于在弹性搜索中索引文档是有意义的。?
更新 2
请找到我修改后的代码。
而不是使用IndexRequest
我使用BulkRequest
public class ProdCatIndexTask implements Runnable {
private final static String INDEX_NAME = "productcatalog_index";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
BulkRequest bulkRequest = new BulkRequest();
public ProdCatIndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
/*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);*/
bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));
try {
//IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
System.out.println("Triggered Bulk Request.....");
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
解决方案
如果您想以批量模式并行加载数据,我建议使用 ElasticSearch API BulkProcessor
。
这是https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html。
示例如何使用 bulkProcessor:
bulkProcessor.add(new IndexRequest("indexName", "type")
.source(toJson(Product), XContentType.JSON);
如果你想更快,你可以将副本的数量减少到 0 并让 ElasticSearch 生成 ID,因为如果你索引自己的 ID,每次 ElasticSearch 都会检查这个 ID 是否存在于 ElasticSearch 中。
关于如何提高加载性能的其他想法:
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
推荐阅读
- java - Sonarqube 和 Jacoco Gradle 插件
- json - 将 json 数据转换为使用 ajax 请求响应组件
- redux - Redux 商店不更新组件道具
- html - css删除h标签中的空白
- php - 显示列的最后一个 id - ZendFramwork
- reactjs - 如何在具有某些条件的反应儿童之间添加元素?
- java - java-如何检查JSP页面中是否存在值?
- c# - aspnetboilerplate 自定义验证抛出异常
- javascript - Math.random 重复数字 JavaScript
- ansible - 如何使用 Ansible 检查特定文件系统的使用情况