Elasticsearch升级数据迁移批量流式操作

编程

一、简介

最近Elasticsearch升级,准备从5.2.2升级到最新的7.4.2。

胆子敢这么肥的一个重要的原因是因为这个业务部分的Elasticsearch集群数据量不大,不到200G。

了解了一下elasticdump,最后放弃了,决定自己写代码来迁移。

因为rest client高版本的和低版本的transport client的兼容问题,最后决定,读数据使用tranport client,高版本写数据自己封装请求然后使用http方式去执行。

二、bulk请求

bulk可以包含索引添加(index)、创建(create)、删除(delete)、更新(update)四种操作。

index和create的区别是,index操作肯定会被执行,id相同的时候会增加文档的版本号。而create操作在id已经存在的时候就不会执行。

建议一般使用index操作,甚至更新内容比较多都可以考虑使用index来代替update。

bulk的语法格式如下。

POST _bulk

{ "index" : { "_index" : "test", "_id" : "1" } }

{ "field1" : "value1" }

{ "delete" : { "_index" : "test", "_id" : "2" } }

{ "create" : { "_index" : "test", "_id" : "3" } }

{ "field1" : "value3" }

{ "update" : {"_id" : "1", "_index" : "test"} }

{ "doc" : {"field2" : "value2"} }

首先是要执行的操作,index、create、delete、update,必须包含索引名称,对于delete和update操作id也是必须的。

接下来就是文档的具体内容,4个操作中update稍微有点不同,文档需要使用doc标识一下。

有2点需要注意:

  1. 最后需要一个空行
  2. content-type必须是application/x-ndjson

三、流式bulk操作

import java.io.IOException;

/**

* elasticsearch bulk 请求

* https://www.elastic.co/guide/en/elasticsearch/reference/7.4/docs-bulk.html

*/

public class BulkBuilder {

private static final String INDEX_TEMPLATE_WITH_ID = "{ "%s" : { "_index" : "%s", "_id" : "%s" } }";

private static final String INDEX_TEMPLATE = "{ "%s" : { "_index" : "%s"} }";

private static final String DOC_TEMPLATE = "{ "doc" : %s }";

private static final String INDEX = "index";

private static final String CREATE = "create";

private static final String DELETE = "delete";

private static final String UPDATE = "update";

/**

* 换行、允许是

或者

*/

private static final String NEWLINE = "

";

/**

* bulk请求的content-type

*/

public static final String CONTENT_TYPE = "application/x-ndjson";

String BULK_URL = "http://%s/_bulk";

private StringBuffer body;

private BulkBuilder(){

body = new StringBuffer();

}

public static BulkBuilder getBuilder(){

return new BulkBuilder();

}

public BulkBuilder index(String index, String id, String docJson){

String head = String.format(INDEX_TEMPLATE_WITH_ID, INDEX, index, id);

body.append(head).append(NEWLINE);

body.append(docJson).append(NEWLINE);

return this;

}

public BulkBuilder index(String index, String docJson){

String head = String.format(INDEX_TEMPLATE, INDEX, index);

body.append(head).append(NEWLINE);

body.append(docJson).append(NEWLINE);

return this;

}

public BulkBuilder update(String index, String id, String docJson){

String head = String.format(INDEX_TEMPLATE_WITH_ID, UPDATE, index, id);

body.append(head).append(NEWLINE);

String updateBody = String.format(DOC_TEMPLATE, docJson);

body.append(updateBody).append(NEWLINE);

return this;

}

public BulkBuilder create(String index, String id, String docJson){

String head = String.format(INDEX_TEMPLATE_WITH_ID, CREATE, index, id);

body.append(head).append(NEWLINE);

body.append(docJson).append(NEWLINE);

return this;

}

public BulkBuilder create(String index, String docJson){

String head = String.format(INDEX_TEMPLATE, CREATE, index);

body.append(head).append(NEWLINE);

body.append(docJson).append(NEWLINE);

return this;

}

public BulkBuilder delete(String index, String id){

String head = String.format(INDEX_TEMPLATE_WITH_ID, DELETE, index, id);

body.append(head).append(NEWLINE);

return this;

}

public String build(){

return body.toString();

}

public String postBulk(String host) throws IOException {

String url = String.format(BULK_URL, host);

if(body.length() > 15){//body为空的时候不执行请求

return HttpUtil.postContent(url,body.toString(),CONTENT_TYPE);

}else {

return "";

}

}

}

上面的HttpUtil可以使用后面给的Http工具,也可以使用Spring的RestTemplate,或者使用JDK自带的HttpConnection都可以。

使用起来也非常方便,下面给一个测试:

@Test

void update() throws IOException {

BulkBuilder builder = BulkBuilder.getBuilder();

JSONObject jsonObject = new JSONObject();

jsonObject.put("name","youyou");

jsonObject.put("age",20);

builder.index("test","1",jsonObject.toString());

JSONObject update = new JSONObject();

update.put("age","200");

builder.update("test","1",update.toJSONString());

String rs = builder.postBulk("127.0.0.1:9200");

System.out.println(rs);

}

四、http工具

import org.apache.http.client.config.RequestConfig;

import org.apache.http.client.methods.*;

import org.apache.http.config.ConnectionConfig;

import org.apache.http.entity.ContentType;

import org.apache.http.entity.StringEntity;

import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.impl.client.HttpClientBuilder;

import org.apache.http.impl.client.HttpClients;

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;

import org.apache.http.util.EntityUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.IOException;

import java.nio.charset.Charset;

public class HttpUtil {

private static Logger logger = LoggerFactory.getLogger(HttpUtil.class);

private static final CloseableHttpClient commonHttpClient ;

static {

HttpClientBuilder httpClientBuilder = HttpClients.custom();

RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();

requestConfigBuilder.setSocketTimeout(3000);

requestConfigBuilder.setConnectTimeout(3000);

requestConfigBuilder.setConnectionRequestTimeout(3000);

httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());

PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();

ConnectionConfig.Builder connectionConfigBuilder = ConnectionConfig.custom();

ConnectionConfig connectionConfig = connectionConfigBuilder.setCharset(Charset.defaultCharset()).setBufferSize(4096).build();

httpClientBuilder.setDefaultConnectionConfig(connectionConfig);

httpClientBuilder.setUserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0");

httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager);

commonHttpClient = httpClientBuilder.build();

}

public static String getContent(String url) throws IOException {

HttpGet method = new HttpGet(url);

CloseableHttpResponse response = commonHttpClient.execute(method);

return EntityUtils.toString(response.getEntity());

}

public static String postContent(String url,String body) throws IOException {

HttpPost method = new HttpPost(url);

method.setEntity(new StringEntity(body, ContentType.DEFAULT_TEXT));

CloseableHttpResponse response = commonHttpClient.execute(method);

return EntityUtils.toString(response.getEntity());

}

public static String postJsonContent(String url,String body) throws IOException {

HttpPost method = new HttpPost(url);

method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));

CloseableHttpResponse response = commonHttpClient.execute(method);

return EntityUtils.toString(response.getEntity());

}

public static String putJsonContent(String url,String body) throws IOException {

HttpPut method = new HttpPut(url);

method.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));

CloseableHttpResponse response = commonHttpClient.execute(method);

return EntityUtils.toString(response.getEntity());

}

public static String delete(String url) throws IOException {

HttpDelete method = new HttpDelete(url);

CloseableHttpResponse response = commonHttpClient.execute(method);

return EntityUtils.toString(response.getEntity());

}

public static String postContent(String url,String body,String contentType) throws IOException {

HttpPost method = new HttpPost(url);

method.setEntity(new StringEntity(body, ContentType.create(contentType)));

CloseableHttpResponse response = commonHttpClient.execute(method);

return EntityUtils.toString(response.getEntity());

}

}

以上是 Elasticsearch升级数据迁移批量流式操作 的全部内容, 来源链接: utcz.com/z/511364.html

回到顶部