
前面说到elasticsearch 7.16集群安装,本文介绍通过springboot 2.6.2集成es的java api对其进行 *** 作。
基于vue和springboot接口,这边做了一个查询界面,有兴趣的同学可以看看:
查询地址:https://search.lrting.top
目前支持的搜索关键词:hudi、hive、flink
首先看一下pom文件
pom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.2 com.example springboot-elasticsearch0.0.1-SNAPSHOT springboot-elasticsearch Demo project for Spring Boot 1.8 5.7.8 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-webRELEASE co.elastic.clients elasticsearch-java7.16.0 com.fasterxml.jackson.core jackson-databind2.12.3 com.alibaba fastjson1.2.76 cn.hutool hutool-all${hutool.version} org.springframework.boot spring-boot-starter-testtest junit junit4.13.2 test org.springframework.boot spring-boot-maven-plugin
Elasticsearch升级到7.16之后,已经废弃了High-level API了,统一使用Low-Level API,所以某些接口发生了变化,下面列出Elasticsearch Low-Level API的一些基本 *** 作:
从application.properties文件读取Elasticsearch配置信息
server.port=8899 spring.application.name=qa-search elasticsearch.hosts=10.0.2.9:9200,10.0.2.78:9200,10.0.2.211:9200 elasticsearch.username=elastic elasticsearch.password=elastic elasticsearch.connection.timeout=10000 elasticsearch.socket.timeout=10000 elasticsearch.connection.request.timeout=10000
配置类
ElasticSearchConfig.java
package com.zh.ch.springboot.elasticsearch.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.zh.ch.springboot.elasticsearch.service.ElasticsearchServiceImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@Configuration
public class ElasticSearchConfig {
private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class);
@Value("${elasticsearch.hosts}")
public String elasticsearchHost;
@Value("${elasticsearch.username}")
public String elasticsearchUsername;
@Value("${elasticsearch.password}")
public String elasticsearchPassword;
@Value("${elasticsearch.connection.timeout}")
public int elasticsearchConnectionTimeout;
@Value("${elasticsearch.socket.timeout}")
public int elasticsearchSocketTimeout;
@Value("${elasticsearch.connection.request.timeout}")
public int getElasticsearchConnectionRequestTimeout;
@Bean
public ElasticsearchClient elasticsearchClient() {
RestClient restClient = RestClient.builder(getESHttpHosts()).setRequestConfigCallback(requestConfigBuilder -> {
//设置连接超时时间
requestConfigBuilder.setConnectTimeout(elasticsearchConnectionTimeout);
requestConfigBuilder.setSocketTimeout(elasticsearchSocketTimeout);
requestConfigBuilder.setConnectionRequestTimeout(getElasticsearchConnectionRequestTimeout);
return requestConfigBuilder;
}).setFailureListener(new RestClient.FailureListener() {
//某节点失败,这里可以做一些告警
@Override
public void onFailure(Node node) {
logger.error(node);
}
}).setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.disableAuthCaching();
//设置账密
return getHttpAsyncClientBuilder(httpClientBuilder);
}).build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
private HttpHost[] getESHttpHosts() {
String[] hosts = elasticsearchHost.split(",");
HttpHost[] httpHosts = new HttpHost[hosts.length];
for (int i = 0; i < httpHosts.length; i++) {
String host = hosts[i];
host = host.replaceAll("http://", "").replaceAll("https://", "");
Assert.isTrue(host.contains(":"), String.format("your host %s format error , Please refer to [ 127.0.0.1:9200 ] ", host));
httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http");
}
return httpHosts;
}
private HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder) {
if (StringUtils.isEmpty(elasticsearchUsername) || StringUtils.isEmpty(elasticsearchPassword)) {
return httpClientBuilder;
}
//账密设置
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//es账号密码(一般使用,用户elastic)
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticsearchUsername, elasticsearchPassword));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
}
}
接口类
ElasticsearchService.java
package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch.core.IndexResponse; import co.elastic.clients.elasticsearch.core.search.Hitsmetadata; import java.util.List; import java.util.Map; public interface ElasticsearchService{ public boolean existsIndex(String index); public boolean createIndex(String index, String aliasename, int numOfShards, Map properties); public boolean deleteIndex(List indexList); public boolean existsdocument(String index, String id, Class clazz); public IndexResponse saveOrUpdatedocument(String index, String id, T qa); public IndexResponse saveOrUpdatedocument(String index, T qa); public T getById(String index, String id, Class clazz); public List getByIdList(String index, List idList, Class clazz); public Hitsmetadata searchByPages(String index, Integer pageNo, Integer pageSize, Class clazz); public boolean deleteById(String index, String id); }
实现类
ElasticsearchImpl.java
package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat; import co.elastic.clients.elasticsearch._types.query_dsl.Query; import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery; import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.search.Highlight; import co.elastic.clients.elasticsearch.core.search.HighlightField; import co.elastic.clients.elasticsearch.core.search.Hitsmetadata; import co.elastic.clients.elasticsearch.indices.*; import co.elastic.clients.elasticsearch.indices.ExistsRequest; import co.elastic.clients.transport.endpoints.BooleanResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; @Component public class ElasticsearchServiceImplimplements ElasticsearchService { private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class); private ElasticsearchClient client; @Autowired public void setClient(ElasticsearchClient client) { this.client = client; } public boolean existsIndex(String index) { try { ExistsRequest existsRequest = new ExistsRequest.Builder().index(index).build(); BooleanResponse response = client.indices().exists(existsRequest); return response.value(); } catch (IOException e) { logger.error("There is an error while getting index", e); } return false; } @Override public boolean createIndex(String indexName, String aliasesName, int numOfShards, Map properties) { try { TypeMapping typeMapping = new TypeMapping.Builder().properties(properties).build(); IndexSettings indexSettings = new IndexSettings.Builder().numberOfShards(String.valueOf(numOfShards)).build(); CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder() .index(indexName) .aliases(aliasesName, new Alias.Builder().isWriteIndex(true).build()) .mappings(typeMapping) .settings(indexSettings) .build(); CreateIndexResponse response = client.indices().create(createIndexRequest); return response.acknowledged(); } catch (IOException e) { logger.error("There is an error while creating index", e); } return false; } @Override public boolean deleteIndex(List indexList) { try { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(indexList).build(); DeleteIndexResponse response = client.indices().delete(deleteIndexRequest); return response.acknowledged(); } catch (IOException e) { logger.error("There is an error while deleting index", e); } return false; } @Override public boolean existsdocument(String index, String id, Class clazz) { try { GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build(); GetResponse getResponse = client.get(getRequest, clazz); return getResponse.found(); } catch (IOException e) { logger.error("There is an error while judging if the document exists", e); } return false; } @Override public IndexResponse saveOrUpdatedocument(String index, String id, T t) { try { IndexRequest indexRequest = new IndexRequest.Builder ().index(index).id(id).document(t).build(); return client.index(indexRequest); } catch (IOException e) { logger.error("There is an error while saving the document", e); } return null; } @Override public IndexResponse saveOrUpdatedocument(String index, T t) { try { IndexRequest indexRequest = new IndexRequest.Builder ().index(index).document(t).build(); return client.index(indexRequest); } catch (IOException e) { logger.error("There is an error while saving the document", e); } return null; } @Override public T getById(String index, String id, Class clazz) { try { GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build(); GetResponse getResponse = client.get(getRequest, clazz); return getResponse.source(); } catch (IOException e) { logger.error("There is an error while getting the document", e); } return null; } @Override public List getByIdList(String index, List idList, Class clazz) { try { List tList = new ArrayList<>(idList.size()); for (String id : idList) { tList.add(client.get(new GetRequest.Builder().index(index).id(id).build(), clazz).source()); } return tList; } catch (IOException e) { logger.error("There is an error while getting the document list", e); } return null; } @Override public Hitsmetadata searchByPages(String index, Integer pageNo, Integer pageSize, Class clazz) { try { SearchRequest searchRequest = new SearchRequest.Builder().index(Collections.singletonList(index)).from(pageNo).size(pageSize).build(); SearchResponse searchResponse = client.search(searchRequest, clazz); return searchResponse.hits(); } catch (IOException e) { logger.error("There is an error while searching by pages", e); } return null; } public boolean deleteById(String index, String id) { try { DeleteRequest deleteRequest = new DeleteRequest.Builder().index(index).id(id).build(); DeleteResponse deleteResponse = client.delete(deleteRequest); return "deleted".equals(deleteResponse.result().jsonValue()); } catch (IOException e) { logger.error("There is an error while deleting id document", e); } return false; } }
测试类
package com.zh.ch.springboot.elasticsearch.service;
import co.elastic.clients.elasticsearch._types.mapping.DateProperty;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.Hitsmetadata;
import com.alibaba.fastjson.JSON;
import com.zh.ch.springboot.elasticsearch.SpringbootElasticsearchApplication;
import com.zh.ch.springboot.elasticsearch.bean.QA;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.*;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootElasticsearchApplication.class)
class ElasticsearchServiceImplTest {
private ElasticsearchServiceImpl elasticsearchService;
@Autowired
public void setElasticsearchService(ElasticsearchServiceImpl elasticsearchService) {
this.elasticsearchService = elasticsearchService;
}
@Test
void existsIndex() {
String index = "es_index_test_1";
boolean existsIndexFlag = elasticsearchService.existsIndex(index);
System.out.printf("%s 是否存在 %b%n", index, existsIndexFlag);
}
@Test
void createIndex() {
String index = "es_index_test_1";
String indexAliasesName = "es_index_test_1_aliases";
Map map = new HashMap<>();
map.put("id", new Property(new DateProperty.Builder().index(true).store(true).build()));
boolean createIndexFlag = elasticsearchService.createIndex(index, indexAliasesName, 12, map);
System.out.printf("创建索引, index:%s , createIndexFlag:%b%n", index, createIndexFlag);
}
@Test
void deleteIndex() {
List indexList = new ArrayList<>();
indexList.add("es_index_test_1");
boolean deleteIndexFlag = elasticsearchService.deleteIndex(indexList);
System.out.printf("删除 %s 索引是否成功 %b", indexList, deleteIndexFlag);
}
@Test
void existsdocument() {
String index = "bigdata";
String id = "1";
boolean existsdocumentFlag = elasticsearchService.existsdocument(index, id, QA.class);
System.out.printf("文档 index为 %s, id为 %s 是否存在于es中: %b",index, id, existsdocumentFlag);
}
@Test
void saveOrUpdatedocument() {
QA qa = new QA();
qa.setType_name("flink");
qa.setTitle("# Checkpoint 做恢复的过程中出现Savepoint failed with error "Checkpoint expired before completing"的问题");
qa.setContent("该问题字面意思看是由于flink在做cp落地hdfs的时候,出现超时失败的问题n" +
"n" +
"tn" +
"tpublic static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;n" +
"可以看到是超时失败的问题(默认超时10min失败)。n" +
"n" +
"## 1.原因分析与排查:n" +
"第一种情况:自身设置了超时时间(自身做持久化的内存也不大的情况)n" +
"n" +
"//例如:仅仅间隔6sec就做持久化n" +
"env.getCheckpointConfig.setCheckpointTimeout( 6 * 1000) //6sec内完成checkpointn" +
"" +
"如上图所示:查看Flink-web-ui的DashBoard中看到checkpoint栏目下的history中各个失败的checkpoint快照,然后查看失败时候,各个算子中使用时间,总有一些大部分完成的算子,但是另外一部分算子做checkpoint时候出现失败的情况。此时要做的是查看这部分算子的计算处理速度慢的原因。n" +
"n" +
"参考这个:[](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepoint-failed-with-error-quot-Checkpoint-expired-before-completing-quot-td24177.html)n" +
"n" +
"n" +
"2.开启并发增长个别处理慢的算子的处理能力;n" +
"n" +
"3.检查代码中是否存在计算速度特别慢的 *** 作(如读写磁盘、数据库、网络传输、创建大对象等耗时 *** 作)n" +
"n" +
"部分检查点成功问题(刚开始成功,过了几个检查点之后持久化失败的问题,参考https://blog.csdn.net/fct2001140269/article/details/88715808)n");
SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
qa.setDt(f.format(new Date()));
qa.setUser_id(1);
IndexResponse indexResponse = elasticsearchService.saveOrUpdatedocument("bigdata", qa);
System.out.printf("插入书籍是否成功 %s", indexResponse.result());
}
@Test
void getById() {
String index = "bigdata";
QA qaList = elasticsearchService.getById(index, "1", QA.class);
System.out.println(JSON.toJSonString(qaList));
}
@Test
void getByIdList() {
String index = "bigdata";
List idList = new ArrayList<>();
idList.add("1");
idList.add("2");
List qaList = elasticsearchService.getByIdList(index, idList, QA.class);
for (QA qa : qaList) {
System.out.println(JSON.toJSonString(qa));
}
}
@Test
void searchByPages() {
String index = "bigdata";
Integer pageNo = 0;
Integer pageSize = 10;
Hitsmetadata qaList = elasticsearchService.searchByPages(index, pageNo, pageSize, QA.class);
System.out.println(qaList.hits().size());
}
@Test
void searchByQuery() {
String queryString = "大数据";
Hitsmetadata qaList = elasticsearchService.searchByQuery(queryString, QA.class);
for (Hit hit : qaList.hits()) {
System.out.println(hit.highlight());
}
}
@Test
void deleteById() {
String index = "bigdata";
String id = "ee00B34BwyhfTnq-1xYe";
boolean deleteByIdFlag = elasticsearchService.deleteById(index, id);
System.out.println(deleteByIdFlag);
}
}
完整代码示例:
https://git.lrting.top/xiaozhch5/springboot-elasticsearch.git
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)