HBase过滤器

database

过滤器

我们我们来看几个常用的过滤器:

过滤器

说明

RowFilter

筛选指定的RowKey

FilterList

组合其他过滤器

ValueFilter

筛选指定值的数据

PrefixFilter

筛选有指定前缀的RowKey

QualifierFilter

筛选指定列名的列

ColumnPrefixFilter

筛选指定前缀的列

SingleColumnValueFilter

筛选指定列的指定值

HBase提供了很多过滤器,但是常用的就几个,最常用的就是:

  1. 通过RowKey过滤的RowFilter过滤器,
  2. 通过RowKey前缀过滤的PrefixFilter过滤器
  3. 通过指定列的指定值过滤的SingleColumnValueFilter过滤器
  4. FilterList组合其他过滤器的过滤器

如上图所示,是HBase提供的一下过滤器。

列名过滤器

如上图所示,列名过滤器QualifierFilter,可以过滤所有列簇中指定列名的列,图中获取的是所有列名为value的列。

列名前缀过滤器

如上图所示,列名前缀过滤器ColumnPrefixFilter,可以过滤所有列簇中指定列名前缀的列,图中获取的是所有列名为前缀为level的列。

列簇过滤器

如上图所示,列簇过滤器FamilyFilter,可以过滤指定列簇的数据,图中获取的是所有列簇为addr中的所有列数据。

Row Key过滤器

如上图所示,是Row Key过滤器RowFilter,可以过滤指定RowKey的数据,图中获取的是RowKey为20200107161557行中所有列簇中所有列的数据。

Row Key前缀过滤器

如上图所示,是Row Key前缀过滤器PrefixFilter,可以过滤RowKey为指定前缀的数据,图中获取的指定RowKey前缀为2020的行中所有列簇中所有列的数据。

值过滤器

值过滤器主要有3个:

  1. ValueFilter
  2. SingleColumnValueFilter
  3. SingleColumnValueExcludeFilter

他们的区别是SingleColumnValueFilter过滤器过滤的是指定列的值,ValueFilter过滤的是任意列的值,SingleColumnValueExcludeFilter是排除指定列是指定值的数据。

过滤器组合

FilterList不是一个过滤器,但是它可以组合其他过滤器。

比较符号

比较肯定就用比较符号:

枚举

含义

LESS

小于

EQUAL

相等

NO_OP

排除所有

GREATER

大于

NOT_EQUAL

不等于

LESS_OR_EQUAL

小于等于

GREATER_OR_EQUAL

大于等于

对应的Java类org.apache.hadoop.hbase.CompareOperator

比较方式

枚举

说明

BitComparator

按位匹配

NullComparator

匹配空

BinaryComparator

匹配完整字节数组

SubstringComparator

子串匹配

RegexStringComparator

正则表达式匹配,只支持EQUAL、NOT_EQUAL

BinaryPrefixComparator

匹配字节数组前缀

比较方式使用的比较多的是子串匹配SubstringComparator和匹配字节数组前缀BinaryPrefixComparator。

注意:HBase中存的是字节数组,不是比较值

要比较值,hbase-common包给了2个值比较的类LongComparator和BigDecimalComparator,如果要比较Integer、double等其他类型需要自定义Comparator,关于HBase自定义Comparator,可以参考:HBase自定义Comparator过滤数值

常用过滤器示例

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CompareOperator;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.Admin;

import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;

import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;

import org.apache.hadoop.hbase.client.Connection;

import org.apache.hadoop.hbase.client.ConnectionFactory;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.client.Table;

import org.apache.hadoop.hbase.client.TableDescriptor;

import org.apache.hadoop.hbase.client.TableDescriptorBuilder;

import org.apache.hadoop.hbase.filter.BinaryComparator;

import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FilterList;

import org.apache.hadoop.hbase.filter.PrefixFilter;

import org.apache.hadoop.hbase.filter.RowFilter;

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

import java.io.IOException;

import java.time.LocalDateTime;

import java.time.format.DateTimeFormatter;

import java.util.LinkedList;

import java.util.List;

import java.util.Random;

public class HBaseFilterTest {

private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";

private static final String ZK_CONNECT_VALUE = "127.0.0.1:2181";

private static final String TABLE_NAME_STR = "user";

private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);

private static final Configuration configuration = HBaseConfiguration.create();

private static Connection connection;

private static Admin admin;

private static Table table;

static {

configuration.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);

try {

connection = ConnectionFactory.createConnection(configuration);

admin = connection.getAdmin();

table = connection.getTable(TABLE_NAME);

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

}

@Test

public void init() throws Exception {

if(admin.tableExists(TABLE_NAME)) {

admin.disableTable(TABLE_NAME);//删除前必须先禁用

admin.deleteTable(TABLE_NAME);//删除表

}

createTable();

puts();

}

private void createTable() throws Exception {

TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TABLE_NAME);

ColumnFamilyDescriptor profile = ColumnFamilyDescriptorBuilder.of("profile");

ColumnFamilyDescriptor consume = ColumnFamilyDescriptorBuilder.of("consume");

tableDescriptorBuilder.setColumnFamily(profile);

tableDescriptorBuilder.setColumnFamily(consume);

TableDescriptor tableDescriptor = tableDescriptorBuilder.build();

admin.createTable(tableDescriptor);//创建表

}

private void puts() throws Exception {

Table table = connection.getTable(TABLE_NAME);

table.put(getPuts());

}

private static List<Put> getPuts(){

LinkedList<Put> puts = new LinkedList<>();

LocalDateTime localDate = LocalDateTime.now();

DateTimeFormatter pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd");

Random random = new Random();

byte[] profileFamily = Bytes.toBytes("profile");

byte[] sexesColumn = Bytes.toBytes("sex");

byte[] birthdayColumn = Bytes.toBytes("birthday");

byte[] consumeFamily = Bytes.toBytes("consume");

byte[] totalColumn = Bytes.toBytes("total");

for(int i=0;i<100;i++) {

byte[] rowKey = Bytes.toBytes(String.format("%03d",i));

Put put = new Put(rowKey);

int sex = random.nextInt(2);

String birthday = localDate.plusDays(random.nextInt(100)).format(pattern);

int total = random.nextInt(1000) + 1000;

put.addColumn(profileFamily, sexesColumn, Bytes.toBytes(sex));

put.addColumn(profileFamily, birthdayColumn, Bytes.toBytes(birthday));

put.addColumn(consumeFamily, totalColumn, Bytes.toBytes(total));

puts.add(put);

}

return puts;

}

/**

* rowkey前缀匹配

* @throws IOException

*/

@Test

public void rowKeyPrefixFilter() throws IOException {

Scan scan = new Scan();

PrefixFilter prefixFilter = new PrefixFilter("05".getBytes());

scan.setFilter(prefixFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

List<Cell> cells = result.listCells();

for(Cell cell : cells) {

System.out.println(cell);

}

}

}

/**

* rowkey匹配

* @throws IOException

*/

@Test

public void rowFilter() throws IOException {

Scan scan = new Scan();

Filter rowFilter = new RowFilter(CompareOperator.GREATER, new BinaryComparator("050".getBytes()));

scan.setFilter(rowFilter);

ResultScanner resultScanner = table.getScanner(scan);

for (Result result : resultScanner) {

List<Cell> cells = result.listCells();

for (Cell cell : cells) {

System.out.println(cell);

}

}

}

/**

* 筛选指定列簇的指定列的值

* @throws IOException

*/

@Test

public void singleColumnValueFilter() throws IOException {

Scan scan = new Scan();

// BinaryComparator comparator = new BinaryComparator(Bytes.toBytes(0));

// BitComparator comparator = new BitComparator(Bytes.toBytes(1), BitComparator.BitwiseOp.XOR);

BinaryPrefixComparator comparator = new BinaryPrefixComparator(Bytes.toBytes(0));

byte[] family = "profile".getBytes();

byte[] column = "sex".getBytes();

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(

family,

column,

CompareOperator.EQUAL,

// comparator);

Bytes.toBytes(0));

singleColumnValueFilter.setFilterIfMissing(true);

scan.setFilter(singleColumnValueFilter);

ResultScanner resultScanner = table.getScanner(scan);

printResultScanner(resultScanner);

}

/**

* 组合过滤器

* @throws IOException

*/

@Test

public void filterListTest() throws IOException {

// FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);//满足一个条件

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);//满足所有条件

byte[] family = "profile".getBytes();

byte[] column = "sex".getBytes();

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(

family,

column,

CompareOperator.EQUAL,

Bytes.toBytes(0));

filterList.addFilter(singleColumnValueFilter);

byte[] consumeFamily = Bytes.toBytes("consume");

byte[] totalColumn = Bytes.toBytes("total");

singleColumnValueFilter = new SingleColumnValueFilter(

consumeFamily,

totalColumn,

CompareOperator.GREATER_OR_EQUAL,

Bytes.toBytes(1100));

filterList.addFilter(singleColumnValueFilter);

Scan scan = new Scan();

scan.setFilter(filterList);

ResultScanner resultScanner = table.getScanner(scan);

printResultScanner(resultScanner);

}

@Test

public void testGet() throws IOException {

LinkedList<Get> gets = new LinkedList<>();

Get get = new Get(Bytes.toBytes("050"));

gets.add(get);

get = new Get(Bytes.toBytes("051"));

gets.add(get);

Result[] results = table.get(gets);

printResult(results);

}

@Test

public void testGetFilter() throws IOException {

LinkedList<Get> gets = new LinkedList<>();

Get get = new Get(Bytes.toBytes("050"));

gets.add(get);

get = new Get(Bytes.toBytes("051"));

byte[] family = "profile".getBytes();

byte[] column = "sex".getBytes();

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(family, column, CompareOperator.EQUAL, Bytes.toBytes(0));

get.setFilter(singleColumnValueFilter);

gets.add(get);

Result[] results = table.get(gets);

printResult(results);

}

@Test

public void printAll() throws IOException {

Scan s = new Scan();

ResultScanner resultScanner = table.getScanner(s);

printResultScanner(resultScanner);

}

private static void printResultScanner(ResultScanner resultScanner){

byte[] profileFamily = Bytes.toBytes("profile");

byte[] sexesColumn = Bytes.toBytes("sex");

byte[] birthdayColumn = Bytes.toBytes("birthday");

byte[] consumeFamily = Bytes.toBytes("consume");

byte[] totalColumn = Bytes.toBytes("total");

for (Result result : resultScanner) {

byte[] sex = result.getValue(profileFamily, sexesColumn);

byte[] birthday = result.getValue(profileFamily, birthdayColumn);

byte[] total = result.getValue(consumeFamily, totalColumn);

System.out.println(String.format("rowkey:%s,sex:%d,birthday:%s,total:%d",

Bytes.toString(result.getRow()),Bytes.toInt(sex),Bytes.toString(birthday),Bytes.toInt(total)

));

}

}

private static void printResult(Result[] datas){

byte[] profileFamily = Bytes.toBytes("profile");

byte[] sexesColumn = Bytes.toBytes("sex");

byte[] birthdayColumn = Bytes.toBytes("birthday");

byte[] consumeFamily = Bytes.toBytes("consume");

byte[] totalColumn = Bytes.toBytes("total");

for (Result result : datas) {

byte[] sex = result.getValue(profileFamily, sexesColumn);

System.out.println(sex);

byte[] birthday = result.getValue(profileFamily, birthdayColumn);

System.out.println(birthday);

byte[] total = result.getValue(consumeFamily, totalColumn);

System.out.println(total);

byte[] row = result.getRow();

System.out.println(row);

if(row == null){

continue;

}

System.out.println(String.format("rowkey:%s,sex:%d,birthday:%s,total:%d",

Bytes.toString(row),Bytes.toInt(sex),Bytes.toString(birthday),Bytes.toInt(total)

));

}

}

}

另一些测试示例

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CompareOperator;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.filter.*;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

import java.io.IOException;

import java.time.LocalDateTime;

import java.time.format.DateTimeFormatter;

import java.util.LinkedList;

import java.util.List;

import java.util.Random;

public class HBaseFilterTest {

private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";

private static final String ZK_CONNECT_VALUE = "127.0.0.1:2181";

private static final String TABLE_NAME_STR = "weather";

private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);

private static final Configuration configuration = HBaseConfiguration.create();

private static Connection connection;

private static Admin admin;

private static Table table;

static {

configuration.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);

try {

connection = ConnectionFactory.createConnection(configuration);

admin = connection.getAdmin();

table = connection.getTable(TABLE_NAME);

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

}

@Test

public void init() throws Exception {

createTable();

puts();

}

private void createTable() throws Exception {

TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TABLE_NAME);

ColumnFamilyDescriptor addr = ColumnFamilyDescriptorBuilder.of("addr");// province city

ColumnFamilyDescriptor temperature = ColumnFamilyDescriptorBuilder.of("temperature");// level value

ColumnFamilyDescriptor windForce = ColumnFamilyDescriptorBuilder.of("wind_force");// level value

ColumnFamilyDescriptor pm25 = ColumnFamilyDescriptorBuilder.of("pm25");// level value

tableDescriptorBuilder.setColumnFamily(addr);

tableDescriptorBuilder.setColumnFamily(temperature);

tableDescriptorBuilder.setColumnFamily(windForce);

tableDescriptorBuilder.setColumnFamily(pm25);

TableDescriptor tableDescriptor = tableDescriptorBuilder.build();

admin.createTable(tableDescriptor);

}

private void puts() throws Exception {

Table table = connection.getTable(TABLE_NAME);

table.put(getPuts());

}

private static List<Put> getPuts(){

LinkedList<Put> puts = new LinkedList<>();

LocalDateTime localDate = LocalDateTime.now();

DateTimeFormatter pattern = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");

Random random = new Random();

String[] provinces = {"SC","XJ"};

String[] cities = {"CD","BJ","SH","CQ"};

int level;

String value;

for(int i=0;i<=60;i++) {

localDate = localDate.plusDays(i);

byte[] rowKey = Bytes.toBytes(localDate.format(pattern));

Put put = new Put(rowKey);

String province = provinces[random.nextInt(provinces.length)];

String city = cities[random.nextInt(cities.length)];

put.addColumn(Bytes.toBytes("addr"), Bytes.toBytes("province"), Bytes.toBytes(province));

put.addColumn(Bytes.toBytes("addr"), Bytes.toBytes("city"), Bytes.toBytes(city));

level = random.nextInt(100);

value = "temperature_" + level;

put.addColumn(Bytes.toBytes("temperature"), Bytes.toBytes("level_temperature"), Bytes.toBytes(level));

put.addColumn(Bytes.toBytes("temperature"), Bytes.toBytes("value"), Bytes.toBytes(value));

level = random.nextInt(100);

value = "wind_force_" + level;

put.addColumn(Bytes.toBytes("wind_force"), Bytes.toBytes("level_wind_force"), Bytes.toBytes(level));

put.addColumn(Bytes.toBytes("wind_force"), Bytes.toBytes("value"), Bytes.toBytes(value));

level = random.nextInt(100);

value = "pm25" + level;

put.addColumn(Bytes.toBytes("pm25"), Bytes.toBytes("level_pm25"), Bytes.toBytes(level));

put.addColumn(Bytes.toBytes("pm25"), Bytes.toBytes("value"), Bytes.toBytes(value));

puts.add(put);

}

return puts;

}

@Test

public void qualifierFilter() throws Exception {

Scan scan = new Scan();

BinaryComparator qualifierComparator = new BinaryComparator("value".getBytes());

Filter qualifierFilter = new QualifierFilter(CompareOperator.EQUAL, qualifierComparator);

scan.setFilter(qualifierFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

List<Cell> cells = result.listCells();

for(Cell cell : cells) {

System.out.println(cell);

}

}

}

@Test

public void columnPrefixFilter() throws IOException {

Scan scan = new Scan();

ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter("level".getBytes());

scan.setFilter(columnPrefixFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

List<Cell> cells = result.listCells();

for(Cell cell : cells) {

System.out.println(cell);

}

}

}

@Test

public void familyFilter() throws IOException {

Scan scan = new Scan();

Filter familyFilter = new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator("addr".getBytes()));

scan.setFilter(familyFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

List<Cell> cells = result.listCells();

for(Cell cell : cells) {

System.out.println(cell);

}

}

}

@Test

public void rowKeyPrefixFilter() throws IOException {

Scan scan = new Scan();

PrefixFilter prefixFilter = new PrefixFilter("2020".getBytes());

scan.setFilter(prefixFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

List<Cell> cells = result.listCells();

for(Cell cell : cells) {

System.out.println(cell);

}

}

}

@Test

public void rowFilter() throws IOException {

Scan scan = new Scan();

Filter rowFilter = new RowFilter(CompareOperator.GREATER, new BinaryComparator("20200107161557".getBytes()));

scan.setFilter(rowFilter);

ResultScanner resultScanner = table.getScanner(scan);

for (Result result : resultScanner) {

List<Cell> cells = result.listCells();

for (Cell cell : cells) {

System.out.println(cell);

}

}

}

@Test

public void singleColumnValueExcludeFilter() throws IOException {

Scan scan = new Scan();

BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes("SC"));

SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(

"addr".getBytes(),//family

"province".getBytes(),//column

CompareOperator.EQUAL,

binaryComparator);

singleColumnValueExcludeFilter.setFilterIfMissing(true);

scan.setFilter(singleColumnValueExcludeFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

List<Cell> cells = result.listCells();

for (Cell cell : cells) {

System.out.println(cell);

}

}

}

@Test

public void singleColumnValueFilter() throws IOException {

Scan scan = new Scan();

SubstringComparator substringComparator = new SubstringComparator("XJ");

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(

"addr".getBytes(),

"province".getBytes(),

CompareOperator.EQUAL,

substringComparator);

singleColumnValueFilter.setFilterIfMissing(true);

scan.setFilter(singleColumnValueFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("addr"),Bytes.toBytes("province"))));

}

}

@Test

public void valueFilter() throws IOException {

Scan scan = new Scan();

Filter valueFilter = new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("50"));

scan.setFilter(valueFilter);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("temperature"),Bytes.toBytes("value"))));

System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("wind_force"),Bytes.toBytes("value"))));

System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("pm25"),Bytes.toBytes("value"))));

}

}

@Test

public void filterListTest() throws IOException {

// FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);//满足一个条件

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);//满足所有条件

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(

"addr".getBytes(),

"province".getBytes(),

CompareOperator.EQUAL,

Bytes.toBytes("SC"));

filterList.addFilter(singleColumnValueFilter);

singleColumnValueFilter = new SingleColumnValueFilter(

"addr".getBytes(),

"city".getBytes(),

CompareOperator.EQUAL,

Bytes.toBytes("SH"));

filterList.addFilter(singleColumnValueFilter);

Scan scan = new Scan();

scan.setFilter(filterList);

ResultScanner resultScanner = table.getScanner(scan);

for(Result result : resultScanner) {

System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("addr"),Bytes.toBytes("province"))));

System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("addr"),Bytes.toBytes("city"))));

}

}

}

以上是 HBase过滤器 的全部内容, 来源链接: utcz.com/z/534955.html

回到顶部