ElasticSearch(四)Java操作ElasticSearch

编程

    今天本来想使用Spring Boot来连接ES的,但是想了想,还是决定用ES官方的Java High Level REST Client,这样可以更好地帮助我理解Spring中ES的原理。

    首先,我们在POM文件中引入(Jar包的版本要和ES的版本一致)

<properties>

<elasticsearch.version>7.6.0</elasticsearch.version>

</properties>

<dependencies>

<dependency>

<groupId>org.elasticsearch.client</groupId>

<artifactId>elasticsearch-rest-high-level-client</artifactId>

<version>${elasticsearch.version}</version>

</dependency>

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

<version>${elasticsearch.version}</version>

</dependency>

</dependencies>

    然后我写了一个Utils类,大家可以看一下里面的注释,或者直接拿过去用我觉得更合适一些,也可以去我deGitHub上下载这个包模块 https://github.com/Raindtop/LDM/tree/master/ldm-common/src/main/java/com/raindrop/common/Utils

package com.raindrop.common.Utils;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.sun.org.apache.xpath.internal.operations.Bool;

import io.netty.util.internal.StringUtil;

import org.apache.http.HttpHost;

import org.apache.lucene.search.TotalHits;

import org.elasticsearch.ElasticsearchException;

import org.elasticsearch.action.ActionListener;

import org.elasticsearch.action.DocWriteResponse;

import org.elasticsearch.action.admin.indices.alias.Alias;

import org.elasticsearch.action.delete.DeleteRequest;

import org.elasticsearch.action.delete.DeleteResponse;

import org.elasticsearch.action.get.GetRequest;

import org.elasticsearch.action.get.GetResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.action.index.IndexResponse;

import org.elasticsearch.action.search.SearchRequest;

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.action.search.ShardSearchFailure;

import org.elasticsearch.action.support.replication.ReplicationResponse;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.indices.CreateIndexRequest;

import org.elasticsearch.client.indices.CreateIndexResponse;

import org.elasticsearch.common.Strings;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.unit.TimeValue;

import org.elasticsearch.common.xcontent.XContentType;

import org.elasticsearch.index.query.MatchQueryBuilder;

import org.elasticsearch.index.query.QueryBuilder;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.rest.RestStatus;

import org.elasticsearch.search.SearchHit;

import org.elasticsearch.search.SearchHits;

import org.elasticsearch.search.aggregations.AggregationBuilders;

import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;

import org.elasticsearch.search.builder.SearchSourceBuilder;

import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;

import org.elasticsearch.search.sort.FieldSortBuilder;

import org.elasticsearch.search.sort.ScoreSortBuilder;

import org.elasticsearch.search.sort.SortOrder;

import org.elasticsearch.search.suggest.SuggestBuilder;

import org.elasticsearch.search.suggest.SuggestBuilders;

import org.elasticsearch.search.suggest.SuggestionBuilder;

import org.springframework.util.StringUtils;

import java.io.IOException;

import java.util.*;

import java.util.concurrent.TimeUnit;

/*

* @Description TODO ElasticSearch工具类

* @Author zhang heson

* @Date 10:49 2020/3/29

**/

public class ElasticSearchUtils {

private static RestHighLevelClient client = null;

/*

* @Description TODO 获取ES的服务器

* @param: []

* @return: org.elasticsearch.client.RestHighLevelClient

* @auther: zhang hesong

* @date: 10:52 2020/3/29

*/

public RestHighLevelClient getClient() {

if (client == null) {

synchronized (ElasticSearchUtils.class) {

if (client == null) {

client = new RestHighLevelClient(

RestClient.builder(

new HttpHost("localhost", 9200, "http") //ES服务器的配置信息

// , new HttpHost("localhost", 9200, "http") 多服务器配置

)

);

}

}

}

return client;

}

/*

* @Description TODO 创建索引

* @param: [indexName, alias, mappings, setting]

* @return: boolean

* @auther: zhang hesong

* @date: 11:43 2020/3/29

*/

public boolean createIndex(String indexName, String alias, JSONObject mappings, Settings setting){

boolean result = true;

try(RestHighLevelClient client = getClient();){

//创建索引名字

CreateIndexRequest request = new CreateIndexRequest(indexName);

//设置索引的settings

if (Optional.ofNullable(setting).isPresent()) {

request.settings(setting);

}

//设置索引的mapping

if (Optional.ofNullable(mappings).isPresent()) {

request.mapping(mappings.toJSONString(), XContentType.JSON);

}

//设置索引的别名

if (!StringUtil.isNullOrEmpty(alias)) {

request.alias(new Alias(alias));

}

//发送请求至ES----同步

CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);

boolean acknowledged = response.isAcknowledged();

boolean shardAsknowledged = response.isShardsAcknowledged();

System.out.println("请求状态:" + (acknowledged && shardAsknowledged));

//发送请求至ES----异步(监听器形式)

// ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {

// @Override

// public void onResponse(CreateIndexResponse createIndexResponse) {

//

// boolean acknowledged = createIndexResponse.isAcknowledged();

// boolean shardAsknowledged = createIndexResponse.isShardsAcknowledged();

// System.out.println("请求状态:" + (acknowledged && shardAsknowledged));

// }

//

// @Override

// public void onFailure(Exception e) {

// System.out.println("缩影创建异常");

// }

// };

// client.indices().createAsync(request, RequestOptions.DEFAULT, listener);

} catch (Exception e) {

e.printStackTrace();

result=false;

}

return result;

}

/*

* @Description TODO 创建索引测试

* @param: [args]

* @return: void

* @auther: zhang hesong

* @date: 11:47 2020/3/29

*/

// public static void main(String[] args) {

// ElasticSearchUtils elasticSearchUtils = new ElasticSearchUtils();

// Settings settings = Settings.builder()

// .put("index.number_of_shards", 3)

// .put("index.number_of_replicas", 2).build();

// System.out.println(elasticSearchUtils.createIndex("test", null, null, settings));

// }

/*

* @Description TODO 向索引中添加数据或者修改数据

* @param: [index, id, data]

* @return: boolean

* @auther: zhang hesong

* @date: 15:43 2020/3/29

*/

public boolean addDocument(String index, String id, Map<String, Object> data){

boolean result = true;

try(RestHighLevelClient client = getClient();){

//设置插入的索引名和主键

IndexRequest request = new IndexRequest(index);

if (!StringUtil.isNullOrEmpty(id)){

request.id(id);

}

//设置类型,不推荐使用

// request.type("_doc");

request.source(data);

IndexResponse response = null;

try{

response = client.index(request, RequestOptions.DEFAULT);

}catch (ElasticsearchException e){

//判断是否存在版本冲突

if(e.status() == RestStatus.CONFLICT){

System.out.println("ES版本与Client-Jar版本不一致");

}

System.out.println("索引异常:" + e);

}

//处理返回的数据逻辑——同步

if(response != null){

String res_index = response.getIndex();

String res_type = response.getType();

String res_id = response.getId();

long version = response.getVersion();

System.out.println("index: " + res_index + ";

" +

"type: " + res_type + ";

" +

"id: " + res_id + ";

" +

"version: " + version + ";

");

if(response.getResult() == DocWriteResponse.Result.CREATED){

System.out.println("新增文档成功");

}else if (response.getResult() == DocWriteResponse.Result.UPDATED){

System.out.println("更新文档成功");

}

//分片处理信息

ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();

if(shardInfo.getTotal() != shardInfo.getSuccessful()){

}

//如果有分片副本失败,可以获取失败原因信息

if(shardInfo.getFailed() > 0){

for(ReplicationResponse.ShardInfo.Failure failure: shardInfo.getFailures()){

String reason = failure.reason();

System.out.println("副本失败原因: " + reason);

}

}

}

//处理返回的数据逻辑——同步

// ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {

// @Override

// public void onResponse(IndexResponse indexResponse) {

//

// }

//

// @Override

// public void onFailure(Exception e) {

//

// }

// };

// client.indexAsync(request, RequestOptions.DEFAULT, listener);

}catch (IOException e){

e.printStackTrace();

result = false;

}

return result;

}

/*

* @Description TODO 向索引中添加数据或者修改数据测试

* @param: [args]

* @return: void

* @auther: zhang hesong

* @date: 15:52 2020/3/29

*/

// public static void main(String[] args) {

// //add

// ElasticSearchUtils elasticSearchUtils = new ElasticSearchUtils();

// Map<String, Object> data = new HashMap<>();

// data.put("name", "Jack");

// data.put("age", 18);

// data.put("birthday", "2020-03-29");

//

// elasticSearchUtils.addDocument("test", null, data);

// //update

//// ElasticSearchUtils elasticSearchUtils = new ElasticSearchUtils();

//// Map<String, Object> data = new HashMap<>();

//// data.put("name", "Jack2");

//// data.put("age", 18);

//// data.put("birthday", "2020-03-29");

////

//// elasticSearchUtils.addDocument("test", "phdUJXEB7FKJ1xYIRsxH", data);

// }

/*

* @Description TODO 删除数据

* @param: [index, id]

* @return: boolean

* @auther: zhang hesong

* @date: 16:28 2020/3/29

*/

public boolean deleteDocument(String index, String id){

boolean result = true;

try(RestHighLevelClient client = getClient();){

DeleteRequest request = new DeleteRequest(index);

request.id(id);

DeleteResponse response = null;

try{

response = client.delete(request, RequestOptions.DEFAULT);

}catch (ElasticsearchException e){

//判断是否存在版本冲突

if(e.status() == RestStatus.CONFLICT){

System.out.println("ES版本与Client-Jar版本不一致");

}

System.out.println("索引异常:" + e);

}

if(Optional.ofNullable(response).isPresent()){

System.out.println("数据删除成功,ID:" + id);

}

//异步请求的写法和上面差不多,我就不重复写了

}catch (IOException e){

e.printStackTrace();

result = false;

}

return result;

}

/*

* @Description TODO 删除数据测试

* @param: [args]

* @return: void

* @auther: zhang hesong

* @date: 16:29 2020/3/29

*/

// public static void main(String[] args) {

// ElasticSearchUtils elasticSearchUtils = new ElasticSearchUtils();

// elasticSearchUtils.deleteDocument("test", "phdUJXEB7FKJ1xYIRsxH");

// }

/*

* @Description TODO 获取单条数据

* @param: [index, id, includes, excludes]

* @return: java.util.Map<java.lang.String,java.lang.Object>

* @auther: zhang hesong

* @date: 16:45 2020/3/29

*/

public Map<String, Object> getDocument(String index, String id, String[] includes, String[] excludes){

Map<String, Object> result = null;

try(RestHighLevelClient client = getClient();) {

GetRequest request = new GetRequest(index);

request.id(id);

if(includes == null || includes.length == 0){

includes = Strings.EMPTY_ARRAY;

}

if(excludes == null || excludes.length == 0){

excludes = Strings.EMPTY_ARRAY;

}

FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);

request.fetchSourceContext(fetchSourceContext);

GetResponse response = client.get(request, RequestOptions.DEFAULT);

if(response != null){

String res_index = response.getIndex();

String res_id = response.getId();

if(response.isExists()){

long version = response.getVersion();

result = response.getSourceAsMap();

//获取字节串

// byte[] result = response.getSourceAsBytes();

//获取字节串

// String result = response.getSourceAsString();

}else{

System.out.println("无此数据");

}

}else{

System.out.println("无此数据");

}

//异步请求的写法和上面差不多,我就不重复写了

}catch (IOException e){

e.printStackTrace();

}

return result;

}

/*

* @Description TODO 获取单个数据的测试

* @param: [args]

* @return: void

* @auther: zhang hesong

* @date: 18:11 2020/3/29

*/

// public static void main(String[] args) {

// ElasticSearchUtils elasticSearchUtils = new ElasticSearchUtils();

// Map<String, Object> map = elasticSearchUtils.getDocument("test", "pRdNJXEB7FKJ1xYI9MzX", null, new String[]{"name", "age"});

// System.out.println(JSON.toJSONString(map));

// }

/*

* @Description TODO 文档搜索功能关于QueryBuilder建议查看源码

* @param: [index, queryBuilder, includes, excludes, pageNum, pageSize]

* @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>

* @auther: zhang hesong

* @date: 19:13 2020/3/29

*/

public List<Map<String, Object>> searchDocument(String index, QueryBuilder queryBuilder,

String[] includes, String[] excludes,

Integer pageNum, Integer pageSize){

List<Map<String, Object>> result = new ArrayList<>();

try(RestHighLevelClient client = getClient();){

SearchRequest searchRequest = new SearchRequest(index);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

//设置查询体

searchSourceBuilder.query(queryBuilder == null ? QueryBuilders.matchAllQuery() : queryBuilder);

//设置页码

searchSourceBuilder.from(pageNum == null ? 0 : pageNum);

//设置每页显示数量

searchSourceBuilder.size(pageSize == null ? 10 : pageSize);

//设置查询超时时间

searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//根据评分进行排序

// searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));

//根据主键进行排序

searchSourceBuilder.sort(new FieldSortBuilder("age").order(SortOrder.ASC));

if(includes == null || includes.length == 0){

includes = Strings.EMPTY_ARRAY;

}

if(excludes == null || excludes.length == 0){

excludes = Strings.EMPTY_ARRAY;

}

searchSourceBuilder.fetchSource(includes, excludes);

// 高亮设置

// HighlightBuilder highlightBuilder = new HighlightBuilder();

// HighlightBuilder.Field highlightTitle =

// new HighlightBuilder.Field("title");

// highlightTitle.highlighterType("unified");

// highlightBuilder.field(highlightTitle);

// HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");

// highlightBuilder.field(highlightUser);

// searchSourceBuilder.highlighter(highlightBuilder);

//加入聚合

// TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")

// .field("company.keyword");

// aggregation.subAggregation(AggregationBuilders.avg("average_age")

// .field("age"));

// searchSourceBuilder.aggregation(aggregation);

//做查询建议

// SuggestionBuilder termSuggestionBuilder =

// SuggestBuilders.termSuggestion("user").text("kmichy");

// SuggestBuilder suggestBuilder = new SuggestBuilder();

// suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);

// searchSourceBuilder.suggest(suggestBuilder);

searchRequest.source(searchSourceBuilder);

SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT);

//结果状态信息

RestStatus status = response.status();

TimeValue took = response.getTook();

Boolean terminatedEarly = response.isTerminatedEarly();

Boolean timeOut = response.isTimedOut();

//分片搜索情况

int totalShards = response.getTotalShards();

int successfulShards = response.getSuccessfulShards();

int failedShards = response.getFailedShards();

for (ShardSearchFailure failure : response.getShardFailures()) {

// 搜索失败处理

}

//处理搜索命中文档结果

SearchHits hits = response.getHits();

//搜索命中文档总数以及最高分

TotalHits totalHits = hits.getTotalHits();

float maxScore = hits.getMaxScore();

SearchHit[] searchHits = hits.getHits();

for (SearchHit hit: searchHits){

String hit_index = hit.getIndex();

String hit_id = hit.getId();

float hit_score = hit.getScore();

Map<String, Object> result_map = hit.getSourceAsMap();

//获取字节串

// byte[] result_byte = hit.getSourceAsBytes();

//获取字节串

// String result_string = hit.getSourceAsString();

result.add(result_map);

}

//异步请求的写法和上面差不多,我就不重复写了

}catch (IOException e){

e.printStackTrace();

}

return result;

}

/*

* @Description TODO 文档搜索功能测试

* @param: [args]

* @return: void

* @auther: zhang hesong

* @date: 19:14 2020/3/29

*/

// public static void main(String[] args) {

// ElasticSearchUtils elasticSearchUtils = new ElasticSearchUtils();

// QueryBuilder queryBuilder = new MatchQueryBuilder("name", "Jack");

// List<Map<String, Object>> list = elasticSearchUtils.searchDocument("test", queryBuilder, new String[]{"name"} , null, null, null);

//

// System.out.println(JSON.toJSONString(list));

// }

}

 

以上是 ElasticSearch(四)Java操作ElasticSearch 的全部内容, 来源链接: utcz.com/z/514893.html

回到顶部