java读取大文件内容到Elasticsearch分析(手把手教你java处理超大csv文件)

database


现在需要快速分析一个2g的csv文件;

基于掌握的知识,使用java按行读取文件,批量导入数据到Elasticsearch

然后利用es强大的聚合能力分析数据,1个小时搞定!

package com.example.demo;

import com.alibaba.fastjson.JSON;

import com.example.demo.entity.Entity;

import org.apache.commons.io.FileUtils;

import org.apache.commons.io.LineIterator;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.action.index.IndexResponse;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.common.xcontent.XContentType;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

import java.util.Objects;

/**

* 读取大文件

* csv格式

*

* @author lhb

* @date 2021/11/11

* @since 1.0.0

*/

@SpringBootTest

publicclass ImportTest {

@Autowired

@Qualifier("client")

private RestHighLevelClient restHighLevelClient;

@Test

void insert() {

     //csv文件2G,63W条数据,十多个字段

String filePath = "D:\file\20211111.csv";

LineIterator it = null;

try {

it = FileUtils.lineIterator(new File(filePath), "UTF-8");

} catch (IOException e) {

e.printStackTrace();

}

try {

while (it.hasNext()) {

String line = it.nextLine();

//System.out.println("line = " + line);

//文件是CSV文件,CSV文件中的每一列是用","隔开的,这样就可以得到每一列的元素

String[] strArray = line.split(",");

//有很长的空格,trim一下

String name = strArray[6].trim();

String code = strArray[8].trim();

String num = strArray[11].trim();

System.out.println(code + "==" + num);

Entity entity = new Entity();

entity.setCode(code);

if (Objects.equals("xxx", code)) {

//跳过表头

continue;

}

entity.setNum(Long.parseLong(num));

entity.setName(name);

entity.setCreateTime(new Date());

String index = "index20211111";

singleInsert2(index, entity);

}

} finally {

LineIterator.closeQuietly(it);

}

}

@Test

void batchInsert() {

String filePath = "D:\express\20211111.csv";

LineIterator it = null;

try {

it = FileUtils.lineIterator(new File(filePath), "UTF-8");

} catch (IOException e) {

e.printStackTrace();

}

try {

int i = 0;

List<Entity> entities = new ArrayList<>();

while (it.hasNext()) {

String line = it.nextLine();

//System.out.println("line = " + line);

String[] strArray = line.split(",");

String code = strArray[6].trim();

String name = strArray[8].trim();

String num = strArray[11].trim();

System.out.println(code + "==" + num);

if (Objects.equals("xxx", code)) {

//跳过表头

continue;

}

Entity entity = new Entity();

entity.setCode(code);

entity.setName(name);

try {

entity.setNum(Long.parseLong(num));

} catch (NumberFormatException e) {

e.printStackTrace();

System.out.println("出错的数据" + code + "==" + num);

}

entity.setCreateTime(new Date());

String index = "index20211111";

//批量插入

entities.add(entity);

i++;
          //如果最后一次批量插入不足10000条数据,需要再此根据实际条数特殊处理

if (i % 10000 == 0) {

System.out.println("i = " + i);

try {

batchInsert2(index, entities);

} catch (IOException e) {

e.printStackTrace();

}

//清空已经处理过的list

entities.clear();

i = 0;

}

}

} finally {

LineIterator.closeQuietly(it);

}

}

/**

* 批量速度杠杠的

*

* @param index

* @param entities

* @throws IOException

*/

publicvoid batchInsert2(String index, List<Entity> entities) throws IOException {

BulkRequest bulkRequest = new BulkRequest(index);

System.out.println("entities.sz = " + entities.size());

for (Entity org : entities) {

IndexRequest request = new IndexRequest();

request.source(JSON.toJSONString(org), XContentType.JSON);

bulkRequest.add(request);

}

restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

}

/**

* 数据量大,超级慢

*

* @param index

* @param entity

*/

publicvoid singleInsert2(String index, Entity entity) {

IndexRequest request = new IndexRequest(index);

request.source(JSON.toJSONString(entity), XContentType.JSON);

try {

IndexResponse index1 = restHighLevelClient.index(request, RequestOptions.DEFAULT);

} catch (IOException e) {

e.printStackTrace();

}

}

}

实体类,需要什么字段自定义

package com.example.demo.entity;

import lombok.Data;

import java.util.Date;

/**

* @author lhb

* @date 2021/11/11

* @since 1.0.0

*/

@Data

publicclass Entity {

/**

* 编码

*/

private String code;

/**

* 名字

*/

private String name;

/**

* 数量

*/

private Long num;

private Date createTime;

}

创建索引映射,然后插入数据:

PUT express_to_village20211104

{

"settings": {

"number_of_shards": 1,

"number_of_replicas": 1

},

"mappings": {

"properties": {

"code": {

"type": "keyword"

},

"name": {

"type": "keyword"

},

"num": {

"type": "long"

},

"createTime": {

"type": "date"

}

}

}

}

开始分析数据:

GET index20211111/_count

{}

 

#返回63w数据

{
"count" : 630000,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}

GET index20211111/_search

{

"query": {

"constant_score": {

"filter": {

"terms": {

"code": [

2222,

1111,

3333

]

}

}

}

},

"size": 1,

"track_total_hits": true,

"aggs": {

"per_code": {

"terms": {

"field": "code",

"size": 200

},

"aggs": {

"num": {

"sum": {

"field": "num"

}

}

}

},

"sum_num": {

"sum": {

"field": "num"

}

}

}

}

 

以上是 java读取大文件内容到Elasticsearch分析(手把手教你java处理超大csv文件) 的全部内容, 来源链接: utcz.com/z/536071.html

回到顶部