Elasticsearch升级数据迁移批量流式操作
一、简介
最近Elasticsearch升级,准备从5.2.2升级到最新的7.4.2。
胆子敢这么肥的一个重要的原因是因为这个业务部分的Elasticsearch集群数据量不大,不到200G。
了解了一下elasticdump,最后放弃了,决定自己写代码来迁移。
因为rest client高版本的和低版本的transport client的兼容问题,最后决定,读数据使用tranport client,高版本写数据自己封装请求然后使用http方式去执行。
二、bulk请求
bulk可以包含索引添加(index)、创建(create)、删除(delete)、更新(update)四种操作。
index和create的区别是,index操作肯定会被执行,id相同的时候会增加文档的版本号。而create操作在id已经存在的时候就不会执行。
建议一般使用index操作,甚至更新内容比较多都可以考虑使用index来代替update。
bulk的语法格式如下。
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
首先是要执行的操作,index、create、delete、update,必须包含索引名称,对于delete和update操作id也是必须的。
接下来就是文档的具体内容,4个操作中update稍微有点不同,文档需要使用doc标识一下。
有2点需要注意:
- 最后需要一个空行
- content-type必须是application/x-ndjson
三、流式bulk操作
import java.io.IOException;/**
* elasticsearch bulk 请求
* https://www.elastic.co/guide/en/elasticsearch/reference/7.4/docs-bulk.html
*/
public class BulkBuilder {
private static final String INDEX_TEMPLATE_WITH_ID = "{ "%s" : { "_index" : "%s", "_id" : "%s" } }";
private static final String INDEX_TEMPLATE = "{ "%s" : { "_index" : "%s"} }";
private static final String DOC_TEMPLATE = "{ "doc" : %s }";
private static final String INDEX = "index";
private static final String CREATE = "create";
private static final String DELETE = "delete";
private static final String UPDATE = "update";
/**
* 换行、允许是
或者
*/
private static final String NEWLINE = "
";
/**
* bulk请求的content-type
*/
public static final String CONTENT_TYPE = "application/x-ndjson";
String BULK_URL = "http://%s/_bulk";
private StringBuffer body;
private BulkBuilder(){
body = new StringBuffer();
}
public static BulkBuilder getBuilder(){
return new BulkBuilder();
}
public BulkBuilder index(String index, String id, String docJson){
String head = String.format(INDEX_TEMPLATE_WITH_ID, INDEX, index, id);
body.append(head).append(NEWLINE);
body.append(docJson).append(NEWLINE);
return this;
}
public BulkBuilder index(String index, String docJson){
String head = String.format(INDEX_TEMPLATE, INDEX, index);
body.append(head).append(NEWLINE);
body.append(docJson).append(NEWLINE);
return this;
}
public BulkBuilder update(String index, String id, String docJson){
String head = String.format(INDEX_TEMPLATE_WITH_ID, UPDATE, index, id);
body.append(head).append(NEWLINE);
String updateBody = String.format(DOC_TEMPLATE, docJson);
body.append(updateBody).append(NEWLINE);
return this;
}
public BulkBuilder create(String index, String id, String docJson){
String head = String.format(INDEX_TEMPLATE_WITH_ID, CREATE, index, id);
body.append(head).append(NEWLINE);
body.append(docJson).append(NEWLINE);
return this;
}
public BulkBuilder create(String index, String docJson){
String head = String.format(INDEX_TEMPLATE, CREATE, index);
body.append(head).append(NEWLINE);
body.append(docJson).append(NEWLINE);
return this;
}
public BulkBuilder delete(String index, String id){
String head = String.format(INDEX_TEMPLATE_WITH_ID, DELETE, index, id);
body.append(head).append(NEWLINE);
return this;
}
public String build(){
return body.toString();
}
public String postBulk(String host) throws IOException {
String url = String.format(BULK_URL, host);
if(body.length() > 15){//body为空的时候不执行请求
return HttpUtil.postContent(url,body.toString(),CONTENT_TYPE);
}else {
return "";
}
}
}
上面的HttpUtil可以使用后面给的Http工具,也可以使用Spring的RestTemplate,或者使用JDK自带的HttpConnection都可以。
使用起来也非常方便,下面给一个测试:
@Testvoid update() throws IOException {
BulkBuilder builder = BulkBuilder.getBuilder();
JSONObject jsonObject = new JSONObject();
jsonObject.put("name","youyou");
jsonObject.put("age",20);
builder.index("test","1",jsonObject.toString());
JSONObject update = new JSONObject();
update.put("age","200");
builder.update("test","1",update.toJSONString());
String rs = builder.postBulk("127.0.0.1:9200");
System.out.println(rs);
}
四、http工具
import org.apache.http.client.config.RequestConfig;import org.apache.http.client.methods.*;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
public class HttpUtil {
private static Logger logger = LoggerFactory.getLogger(HttpUtil.class);
private static final CloseableHttpClient commonHttpClient ;
static {
HttpClientBuilder httpClientBuilder = HttpClients.custom();
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
requestConfigBuilder.setSocketTimeout(3000);
requestConfigBuilder.setConnectTimeout(3000);
requestConfigBuilder.setConnectionRequestTimeout(3000);
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
ConnectionConfig.Builder connectionConfigBuilder = ConnectionConfig.custom();
ConnectionConfig connectionConfig = connectionConfigBuilder.setCharset(Charset.defaultCharset()).setBufferSize(4096).build();
httpClientBuilder.setDefaultConnectionConfig(connectionConfig);
httpClientBuilder.setUserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0");
httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager);
commonHttpClient = httpClientBuilder.build();
}
public static String getContent(String url) throws IOException {
HttpGet method = new HttpGet(url);
CloseableHttpResponse response = commonHttpClient.execute(method);
return EntityUtils.toString(response.getEntity());
}
public static String postContent(String url,String body) throws IOException {
HttpPost method = new HttpPost(url);
method.setEntity(new StringEntity(body, ContentType.DEFAULT_TEXT));
CloseableHttpResponse response = commonHttpClient.execute(method);
return EntityUtils.toString(response.getEntity());
}
public static String postJsonContent(String url,String body) throws IOException {
HttpPost method = new HttpPost(url);
method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
CloseableHttpResponse response = commonHttpClient.execute(method);
return EntityUtils.toString(response.getEntity());
}
public static String putJsonContent(String url,String body) throws IOException {
HttpPut method = new HttpPut(url);
method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
CloseableHttpResponse response = commonHttpClient.execute(method);
return EntityUtils.toString(response.getEntity());
}
public static String delete(String url) throws IOException {
HttpDelete method = new HttpDelete(url);
CloseableHttpResponse response = commonHttpClient.execute(method);
return EntityUtils.toString(response.getEntity());
}
public static String postContent(String url,String body,String contentType) throws IOException {
HttpPost method = new HttpPost(url);
method.setEntity(new StringEntity(body, ContentType.create(contentType)));
CloseableHttpResponse response = commonHttpClient.execute(method);
return EntityUtils.toString(response.getEntity());
}
}
以上是 Elasticsearch升级数据迁移批量流式操作 的全部内容, 来源链接: utcz.com/z/511364.html