首页 > 技术文章 > ES 6.7版本工具类(三)

colourness 2022-02-14 17:34 原文

(一)使用java的es官网api连接客户端
public class EsClientUtils {
    
    /**
     * 获取es客户端对象,目前使用原生es的api连接华为es集群,华为集群连接采用https
     * @return
     * @throws Exception
     */
    public static RestHighLevelClient getClient()throws Exception {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(
                "user_name"
                ,"password"));
        SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustAllStrategy() ).build();
        SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);
        RestClientBuilder builder = RestClient
                .builder(
                        new HttpHost("127.0.0.1", 24100,"https")
                        ,new HttpHost("127.0.0.2", 24100,"https")
                        ,new HttpHost("127.0.0.3", 24100,"https")
                )
                .setRequestConfigCallback(requestConfigBuilder ->
                        requestConfigBuilder.setConnectTimeout(50000)
                                .setSocketTimeout(150000)
                )
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        //华为es集群报错,所以注释掉了
                        //httpAsyncClientBuilder.disableAuthCaching();
                        httpAsyncClientBuilder.setSSLStrategy(sessionStrategy);
                        return httpAsyncClientBuilder;
                    }
                    
                })
                ;
        builder.setMaxRetryTimeoutMillis(60000);
        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }
}

 

(二)连接es的工具类
public class EsBaseUtils {
    public static void main(String[] args) throws Exception{
        //查询mapping字段是否存在
        RestHighLevelClient client = EsClientUtils.getClient();
        log.info("client="+ JSONObject.toJSONString(client));
        try{
            String index = "lichao_test";
            //创建索引
            //createIndexWithShardNum(client,index);
            //添加mapping
            //putMapping(client,index);
            //bulkInsert(client,index);
            //插入数据
            //insert(client,index);
            //查询集群信息
            /*MainResponse response = client.info(RequestOptions.DEFAULT);
            ClusterName clusterName = response.getClusterName();
            System.out.println("clusterName:"+  clusterName );*/
            // 查询索引
            /*GetIndexRequest indexRequest = new GetIndexRequest(index);
            GetIndexResponse getResponse = client.indices().get(indexRequest, RequestOptions.DEFAULT);
            System.out.println("getResponse.getAliases() = "+JSONObject.toJSONString(getResponse.getAliases()));
            System.out.println("getResponse.getMappings() = "+JSONObject.toJSONString(getResponse.getMappings()));
            System.out.println("getResponse.getIndices() = "+JSONObject.toJSONString(getResponse.getIndices()));*/
            //  查询数据
           /* SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(QueryBuilders.matchAllQuery());
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            log.info("**** searchResponse ="+JSONObject.toJSONString(searchResponse));*/
            //获取索引的mapping内容
            getMapping(client,index);
        }catch (Exception ex){
            log.error("!!!!创建es客户端异常 ex=");
            ex.printStackTrace();
        }finally {
            if(Objects.nonNull(client)){
                try{
                    client.close();
                }catch (Exception ex){
                    log.error("!!!!关闭es客户端异常");
                }
            }
        }
    }
    /**
     * 插入数据
     * @param restClient es客户端
     * @param indexName 索引名称
     * @param jsonMap 拼好的es键值
     */
    private static void insert(RestHighLevelClient restClient,String indexName,Map<String, Object> jsonMap){
        /*Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("id", "1");
        jsonMap.put("name", "李超");
        jsonMap.put("sex", "男");*/
        try{
            IndexRequest indexRequest = new IndexRequest(indexName)
                    .type("_doc")
                    //.id("1")
                    .source(jsonMap);
            IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT);
            log.info("indexResponse="+ JSONObject.toJSONString(indexResponse));
        } catch (Exception ex){
            log.error("插入数据异常ex="+ex);
        }
        finally{
            //关闭连接
            try{
                restClient.close();
            }catch (Exception ex){
                log.error("关闭客户端对象时为空ex="+ex);
            }
        }
    }
    /**
     * 批量插入数据
     * @param restClient es客户端
     * @param indexName 索引名称
     * @param jsonMapList 拼好的es键值列表
     */
    private static void bulkInsert(RestHighLevelClient restClient,String indexName, List<Map<String, Object>> jsonMapList){
        BulkRequest request = new BulkRequest();
        for(Map<String, Object> tmp:jsonMapList){
            request.add(new IndexRequest(indexName).type("_doc").source(tmp));
        }
        try{
            restClient.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception ex){
            log.error("插入数据异常ex="+ex);
        }
        finally{
            //关闭连接
            try{
                restClient.close();
            }catch (Exception ex){
                log.error("关闭客户端对象时为空ex="+ex);
            }
        }
    }
    /**
     * 创建一个索引并带有分片
     * @param restClient es客户端
     * @param indexName 索引名称
     * @param numberOfShards 数据分片数,默认为5
     * @param numberOfReplicas 数据备份数,如果只有一台机器,设置为0
     * @throws Exception
     */
    private static void createIndexWithShardNum(RestHighLevelClient restClient,String indexName,int numberOfShards,int numberOfReplicas) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.settings(Settings.builder()
                .put("index.number_of_shards", numberOfShards)
                .put("index.number_of_replicas", numberOfReplicas)
        );
        //同步执行
        CreateIndexResponse createIndexResponse = restClient.indices().create(request, RequestOptions.DEFAULT);
        log.info("****** createIndexResponse = "+JSONObject.toJSONString(createIndexResponse));
    }
    /**
     * 为索引添加mapping
     * restClient 客户端对象
     * indexName 索引名称
     * fieldMap 字段属性列表Map<字段名称,属性>,如:Map<"id","text">、Map<"age","long">
     * @throws Exception
     */
    private static void putMapping(RestHighLevelClient restClient,String indexName,Map<String,Object> fieldMap) throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("properties");
            {
                for(Map.Entry entry: fieldMap.entrySet()){
                    builder.startObject((String)entry.getKey());
                    {
                        builder.field("type", entry.getValue());
                    }
                    builder.endObject();
                }
            }
            builder.endObject();
        }
        builder.endObject();
        PutMappingRequest mappingRequest = new PutMappingRequest(indexName);
        mappingRequest.source(builder);
        //同步执行
        AcknowledgedResponse putMappingResponse = restClient.indices().putMapping(mappingRequest, RequestOptions.DEFAULT);
        log.info("******* putMappingResponse = "+JSONObject.toJSONString(putMappingResponse));
    }
    
    /**
     * 不知道啥应用场景
     * @param restClient
     * @param indexName
     */
    private static void getFieldMapping(RestHighLevelClient restClient,String indexName) throws IOException {
        GetFieldMappingsRequest request = new GetFieldMappingsRequest();
        request.indices(indexName);
        request.fields("name", "text");
        GetFieldMappingsResponse response =
                restClient.indices().getFieldMapping(request, RequestOptions.DEFAULT);
        log.info("GetFieldMappingsResponse = "+JSONObject.toJSONString(response));
        final Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>> mappings =
                response.mappings();
        log.info("mappings = "+JSONObject.toJSONString(mappings));
        final Map<String, GetFieldMappingsResponse.FieldMappingMetaData> fieldMappings =
                mappings.get(indexName);
        log.info("FieldMappingMetaData = "+JSONObject.toJSONString(fieldMappings));
        final GetFieldMappingsResponse.FieldMappingMetaData metaData =
                fieldMappings.get("name");
        log.info("metaData = "+JSONObject.toJSONString(metaData));
        
        final String fullName = metaData.fullName();
        log.info("fullName = "+fullName);
        final Map<String, Object> source = metaData.sourceAsMap();
        log.info("source = "+source);
    }
    
    /**
     * 获取mapping
     * @param restClient
     * @param indexName
     */
    private static Map<String, Object> getMapping(RestHighLevelClient restClient,String indexName) throws IOException {
        GetMappingsRequest request = new GetMappingsRequest();
        request.indices(indexName);
        //扩展索引名称的选项
        request.indicesOptions ( IndicesOptions.lenientExpandOpen ());
        GetMappingsResponse getMappingResponse = restClient.indices().getMapping(request, RequestOptions.DEFAULT);
        //返回所有索引的mapping
        Map<String, MappingMetaData> allMappings = getMappingResponse.mappings();
        MappingMetaData indexMapping = allMappings.get(indexName);
        Map<String, Object> mapping = indexMapping.sourceAsMap();
        //{"properties":{"name":{"type":"text"},"sex":{"type":"text"},"id":{"type":"text"}}}
        log.info("Mapping 内容="+JSONObject.toJSONString(mapping));
        return mapping;
    }
    
    /**
     * 获取es索引是否存在
     * @param restClient es客户端
     * @param indexName 索引名称
     * @return true:存在false:不存在
     * @throws IOException
     */
    private static boolean existIndex(RestHighLevelClient restClient,String indexName) throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
        boolean result = restClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        return result;
    }
}

 

 

推荐阅读