java读取大文件内容到Elasticsearch分析(手把手教你java处理超大csv文件)
现在需要快速分析一个2g的csv文件;
基于掌握的知识,使用java按行读取文件,批量导入数据到Elasticsearch,
然后利用es强大的聚合能力分析数据,1个小时搞定!
package com.example.demo;import com.alibaba.fastjson.JSON;import com.example.demo.entity.Entity;import org.apache.commons.io.FileUtils;import org.apache.commons.io.LineIterator;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.test.context.SpringBootTest;import java.io.File;import java.io.IOException;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.Objects;/*** 读取大文件
* csv格式
*
*
@author lhb* @date 2021/11/11
*
@since 1.0.0*/@SpringBootTest
publicclass ImportTest {@Autowired
@Qualifier(
"client")private RestHighLevelClient restHighLevelClient;@Test
void insert() {//csv文件2G,63W条数据,十多个字段
String filePath = "D:\file\20211111.csv";
LineIterator it = null;
try {
it = FileUtils.lineIterator(new File(filePath), "UTF-8");
} catch (IOException e) {
e.printStackTrace();
}
try {
while (it.hasNext()) {
String line = it.nextLine();
//System.out.println("line = " + line);
//文件是CSV文件,CSV文件中的每一列是用","隔开的,这样就可以得到每一列的元素
String[] strArray = line.split(",");
//有很长的空格,trim一下
String name = strArray[6].trim();
String code = strArray[8].trim();
String num = strArray[11].trim();
System.out.println(code + "==" + num);
Entity entity = new Entity();
entity.setCode(code);
if (Objects.equals("xxx", code)) {
//跳过表头
continue;
}
entity.setNum(Long.parseLong(num));
entity.setName(name);
entity.setCreateTime(new Date());
String index = "index20211111";
singleInsert2(index, entity);
}
} finally {
LineIterator.closeQuietly(it);
}
}
@Test
void batchInsert() {
String filePath = "D:\express\20211111.csv";
LineIterator it = null;
try {
it = FileUtils.lineIterator(new File(filePath), "UTF-8");
} catch (IOException e) {
e.printStackTrace();
}
try {
int i = 0;
List<Entity> entities = new ArrayList<>();
while (it.hasNext()) {
String line = it.nextLine();
//System.out.println("line = " + line);
String[] strArray = line.split(",");
String code = strArray[6].trim();
String name = strArray[8].trim();
String num = strArray[11].trim();
System.out.println(code + "==" + num);
if (Objects.equals("xxx", code)) {
//跳过表头
continue;
}
Entity entity = new Entity();
entity.setCode(code);
entity.setName(name);
try {
entity.setNum(Long.parseLong(num));
} catch (NumberFormatException e) {
e.printStackTrace();
System.out.println("出错的数据" + code + "==" + num);
}
entity.setCreateTime(new Date());
String index = "index20211111";
//批量插入
entities.add(entity);
i++;
//如果最后一次批量插入不足10000条数据,需要再此根据实际条数特殊处理if (i % 10000 == 0) {
System.out.println("i = " + i);
try {
batchInsert2(index, entities);
} catch (IOException e) {
e.printStackTrace();
}
//清空已经处理过的list
entities.clear();
i = 0;
}
}
} finally {
LineIterator.closeQuietly(it);
}
}
/**
* 批量速度杠杠的
*
* @param index
* @param entities
* @throws IOException
*/
publicvoid batchInsert2(String index, List<Entity> entities) throws IOException {
BulkRequest bulkRequest = new BulkRequest(index);
System.out.println("entities.sz = " + entities.size());
for (Entity org : entities) {
IndexRequest request = new IndexRequest();
request.source(JSON.toJSONString(org), XContentType.JSON);
bulkRequest.add(request);
}
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}
/**
* 数据量大,超级慢
*
* @param index
* @param entity
*/
publicvoid singleInsert2(String index, Entity entity) {
IndexRequest request = new IndexRequest(index);
request.source(JSON.toJSONString(entity), XContentType.JSON);
try {
IndexResponse index1 = restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
实体类,需要什么字段自定义
package com.example.demo.entity;import lombok.Data;import java.util.Date;/***
@author lhb* @date 2021/11/11
*
@since 1.0.0*/@Data
publicclass Entity {/*** 编码
*/private String code;
/**
* 名字
*/
private String name;
/**
* 数量
*/
private Long num;
private Date createTime;
}
创建索引映射,然后插入数据:
PUT express_to_village20211104{
"settings": {"number_of_shards": 1,"number_of_replicas": 1},
"mappings": {"properties": {"code": {"type": "keyword"},
"name": {"type": "keyword"},
"num": {"type": "long"},
"createTime": {"type": "date"}
}
}
}
开始分析数据:
GET index20211111/_count{}
#返回63w数据
{
"count" : 630000,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
GET index20211111/_search{
"query": {"constant_score": {"filter": {"terms": {"code": [2222,1111,3333]
}
}
}
},
"size": 1,"track_total_hits": true,"aggs": {"per_code": {"terms": {"field": "code","size": 200},
"aggs": {"num": {"sum": {"field": "num"}
}
}
},
"sum_num": {"sum": {"field": "num"}
}
}
}
以上是 java读取大文件内容到Elasticsearch分析(手把手教你java处理超大csv文件) 的全部内容, 来源链接: utcz.com/z/536071.html