HBase自定义Comparator过滤数值

编程

简介

要自定义HBase的Comparator,需要使用到protoc,可以在下面的链接下载:

protoc新版本下载

protoc2.5版本下载

完整的工程你可以在下面的链接下载:

HBase自定义Comparator实例

proto文件格式

enum Sex{

MALE = 1;

FEMALE = 2;

}

message Bean {

required int32 id = 1;

required string name = 2;

optional Sex sex = 3[defalut = MALE];

}

enum关键字定义一个枚举类型,message定义一个类。

  1. required: 必须包含该字段一次
  2. optional: 包含该字段零次或一次
  3. repeated: 该字段可以重复任意多次

字段定义说明:

optional   Sex      sex      = 3       [defalut = MALE];

字段规则 字段类型 字段名称 唯一标识符 可选的选项:默认值

数据默认值:

  1. int: 0
  2. string: 空字符
  3. bool: false
  4. enum: 枚举的第一项

定义一个proto文件

syntax = "proto2";

option java_package = "org.curitis.comparator";

option java_outer_classname = "HBaseFilterNumberComparatorProto";

option java_generic_services = true;

option optimize_for = SPEED;

message HBaseFilterNumberComparator {

required bytes value = 1;

required string fieldType = 2;

}

第一行是指定protoc的版本,可以是proto2,也可以是proto3,根据自己需求,因为我使用的是HBase2.0.0版本,依赖的是protoc2.5,所以这里填写proto2。

想要看HBase依赖那个版本的protoc,可以在对应的工程下执行下面的命令:

mvn dependency:tree >> tree.txt

然后在对应的输出文件tree.txt中搜索protoc。

java_package指定生成的包名,java_outer_classname指定输出的类名,如果没有指定,默认使用proto scheme文件的名字。

message是定义要序列化的实体类。

protoc命令

protoc.exe ./HBaseFilterNumberComparatorProto.proto --java_out=./

编译非常简单,指定proto scheme文件,和输出目录就可以了。

自定义比较器

protoc编译生成的HBaseFilterNumberComparatorProto类比较长,我们放在最后,这里我们直接来看自定义比较器。

HBase自定义比较器一般继承ByteArrayComparable类,具体的实现,我们可以参考hBase-common包中已经实现了的LongComparator和BigDecimalComparator。

import com.google.protobuf.ByteString;

import com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.exceptions.DeserializationException;

import org.apache.hadoop.hbase.filter.ByteArrayComparable;

import org.apache.hadoop.hbase.util.Bytes;

/**

* 自定义比较器

*/

public class FilterNumberComparator extends ByteArrayComparable {

private String fieldType;

private byte[] data;

/**

* @param value

* @param fieldType

*/

public FilterNumberComparator(byte[] value, String fieldType) {

super(value);

this.fieldType = fieldType.toLowerCase();

this.data = value;

}

@Override

public byte[] toByteArray() {

HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.Builder builder =

HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.newBuilder();

builder.setValue(ByteString.copyFrom(this.data));

builder.setFieldType(this.fieldType);

return builder.build().toByteArray();

}

// 定义该方法,用于对象反序列化操作

public static FilterNumberComparator parseFrom(final byte[] bytes) throws DeserializationException {

HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator proto;

try {

proto = HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.parseFrom(bytes);

} catch (InvalidProtocolBufferException e) {

throw new DeserializationException(e);

}

return new FilterNumberComparator(proto.getValue().toByteArray(), proto.getFieldType());

}

@Override

public int compareTo(byte[] bytes, int offset, int length) {

if (fieldType.equalsIgnoreCase("int") || fieldType.equalsIgnoreCase("integer")) {

int that = Bytes.toInt(bytes, offset, length);

return Integer.compare(Bytes.toInt(data), that);

} else if (fieldType.equalsIgnoreCase("float")) {

float that = Bytes.toFloat(bytes, offset);

return Float.compare(Bytes.toFloat(data), that);

} else if (fieldType.equalsIgnoreCase("double")) {

double that = Bytes.toDouble(bytes, offset);

return Double.compare(Bytes.toDouble(data),that);

}

return 1;

}

}

具体的业务逻辑在compareTo方法中,上面我们实现了integer、float、double类型的比较。

具体怎样使用这个Comparator呢?

首先需要引入依赖的FilterNumberComparator的jar包

 @Test

public void singleColumnValueFilter() throws IOException {

Scan scan = new Scan();

FilterNumberComparator comparator = new FilterNumberComparator(Bytes.toBytes(1100.0),"double");

byte[] family = "consume".getBytes();

byte[] column = "total".getBytes();

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(

family,

column,

CompareOperator.GREATER,

comparator);

singleColumnValueFilter.setFilterIfMissing(true);

scan.setFilter(singleColumnValueFilter);

ResultScanner resultScanner = table.getScanner(scan);

printResultScanner(resultScanner);

}

上面我们就是过滤列簇为consume中的列total的值大于1100的行,一定要注意是1100.0,而不是1100,因为是double类型的,如果是1100,Bytes.toBytes就会把它识别为整型。

运行问题

自定义FilterNumberComparator需要单独打成jar包,放在HBase的lib目录下,如:/usr/local/hbase-2.0.0/lib,如果是集群,注意每一个节点都要放。

然后需要重启HBase:

sudo ./stop-hbase.sh

sudo ./start-hbase.sh

HBaseFilterNumberComparatorProto

// Generated by the protocol buffer compiler.  DO NOT EDIT!

// source: HBaseFilterNumberComparatorProto.proto

package org.curitis.comparator;

public final class HBaseFilterNumberComparatorProto {

private HBaseFilterNumberComparatorProto() {}

public static void registerAllExtensions(

com.google.protobuf.ExtensionRegistry registry) {

}

public interface HBaseFilterNumberComparatorOrBuilder

extends com.google.protobuf.MessageOrBuilder {

// required bytes value = 1;

/**

* <code>required bytes value = 1;</code>

*/

boolean hasValue();

/**

* <code>required bytes value = 1;</code>

*/

com.google.protobuf.ByteString getValue();

// required string fieldType = 2;

/**

* <code>required string fieldType = 2;</code>

*/

boolean hasFieldType();

/**

* <code>required string fieldType = 2;</code>

*/

String getFieldType();

/**

* <code>required string fieldType = 2;</code>

*/

com.google.protobuf.ByteString

getFieldTypeBytes();

}

/**

* Protobuf type {@code HBaseFilterNumberComparator}

*/

public static final class HBaseFilterNumberComparator extends

com.google.protobuf.GeneratedMessage

implements HBaseFilterNumberComparatorOrBuilder {

// Use HBaseFilterNumberComparator.newBuilder() to construct.

private HBaseFilterNumberComparator(com.google.protobuf.GeneratedMessage.Builder<?> builder) {

super(builder);

this.unknownFields = builder.getUnknownFields();

}

private HBaseFilterNumberComparator(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }

private static final HBaseFilterNumberComparator defaultInstance;

public static HBaseFilterNumberComparator getDefaultInstance() {

return defaultInstance;

}

public HBaseFilterNumberComparator getDefaultInstanceForType() {

return defaultInstance;

}

private final com.google.protobuf.UnknownFieldSet unknownFields;

@Override

public final com.google.protobuf.UnknownFieldSet

getUnknownFields() {

return this.unknownFields;

}

private HBaseFilterNumberComparator(

com.google.protobuf.CodedInputStream input,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws com.google.protobuf.InvalidProtocolBufferException {

initFields();

int mutable_bitField0_ = 0;

com.google.protobuf.UnknownFieldSet.Builder unknownFields =

com.google.protobuf.UnknownFieldSet.newBuilder();

try {

boolean done = false;

while (!done) {

int tag = input.readTag();

switch (tag) {

case 0:

done = true;

break;

default: {

if (!parseUnknownField(input, unknownFields,

extensionRegistry, tag)) {

done = true;

}

break;

}

case 10: {

bitField0_ |= 0x00000001;

value_ = input.readBytes();

break;

}

case 18: {

bitField0_ |= 0x00000002;

fieldType_ = input.readBytes();

break;

}

}

}

} catch (com.google.protobuf.InvalidProtocolBufferException e) {

throw e.setUnfinishedMessage(this);

} catch (java.io.IOException e) {

throw new com.google.protobuf.InvalidProtocolBufferException(

e.getMessage()).setUnfinishedMessage(this);

} finally {

this.unknownFields = unknownFields.build();

makeExtensionsImmutable();

}

}

public static final com.google.protobuf.Descriptors.Descriptor

getDescriptor() {

return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_descriptor;

}

protected FieldAccessorTable

internalGetFieldAccessorTable() {

return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_fieldAccessorTable

.ensureFieldAccessorsInitialized(

HBaseFilterNumberComparator.class, Builder.class);

}

public static com.google.protobuf.Parser<HBaseFilterNumberComparator> PARSER =

new com.google.protobuf.AbstractParser<HBaseFilterNumberComparator>() {

public HBaseFilterNumberComparator parsePartialFrom(

com.google.protobuf.CodedInputStream input,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws com.google.protobuf.InvalidProtocolBufferException {

return new HBaseFilterNumberComparator(input, extensionRegistry);

}

};

@Override

public com.google.protobuf.Parser<HBaseFilterNumberComparator> getParserForType() {

return PARSER;

}

private int bitField0_;

// required bytes value = 1;

public static final int VALUE_FIELD_NUMBER = 1;

private com.google.protobuf.ByteString value_;

/**

* <code>required bytes value = 1;</code>

*/

public boolean hasValue() {

return ((bitField0_ & 0x00000001) == 0x00000001);

}

/**

* <code>required bytes value = 1;</code>

*/

public com.google.protobuf.ByteString getValue() {

return value_;

}

// required string fieldType = 2;

public static final int FIELDTYPE_FIELD_NUMBER = 2;

private Object fieldType_;

/**

* <code>required string fieldType = 2;</code>

*/

public boolean hasFieldType() {

return ((bitField0_ & 0x00000002) == 0x00000002);

}

/**

* <code>required string fieldType = 2;</code>

*/

public String getFieldType() {

Object ref = fieldType_;

if (ref instanceof String) {

return (String) ref;

} else {

com.google.protobuf.ByteString bs =

(com.google.protobuf.ByteString) ref;

String s = bs.toStringUtf8();

if (bs.isValidUtf8()) {

fieldType_ = s;

}

return s;

}

}

/**

* <code>required string fieldType = 2;</code>

*/

public com.google.protobuf.ByteString

getFieldTypeBytes() {

Object ref = fieldType_;

if (ref instanceof String) {

com.google.protobuf.ByteString b =

com.google.protobuf.ByteString.copyFromUtf8(

(String) ref);

fieldType_ = b;

return b;

} else {

return (com.google.protobuf.ByteString) ref;

}

}

private void initFields() {

value_ = com.google.protobuf.ByteString.EMPTY;

fieldType_ = "";

}

private byte memoizedIsInitialized = -1;

public final boolean isInitialized() {

byte isInitialized = memoizedIsInitialized;

if (isInitialized != -1) return isInitialized == 1;

if (!hasValue()) {

memoizedIsInitialized = 0;

return false;

}

if (!hasFieldType()) {

memoizedIsInitialized = 0;

return false;

}

memoizedIsInitialized = 1;

return true;

}

public void writeTo(com.google.protobuf.CodedOutputStream output)

throws java.io.IOException {

getSerializedSize();

if (((bitField0_ & 0x00000001) == 0x00000001)) {

output.writeBytes(1, value_);

}

if (((bitField0_ & 0x00000002) == 0x00000002)) {

output.writeBytes(2, getFieldTypeBytes());

}

getUnknownFields().writeTo(output);

}

private int memoizedSerializedSize = -1;

public int getSerializedSize() {

int size = memoizedSerializedSize;

if (size != -1) return size;

size = 0;

if (((bitField0_ & 0x00000001) == 0x00000001)) {

size += com.google.protobuf.CodedOutputStream

.computeBytesSize(1, value_);

}

if (((bitField0_ & 0x00000002) == 0x00000002)) {

size += com.google.protobuf.CodedOutputStream

.computeBytesSize(2, getFieldTypeBytes());

}

size += getUnknownFields().getSerializedSize();

memoizedSerializedSize = size;

return size;

}

private static final long serialVersionUID = 0L;

@Override

protected Object writeReplace()

throws java.io.ObjectStreamException {

return super.writeReplace();

}

public static HBaseFilterNumberComparator parseFrom(

com.google.protobuf.ByteString data)

throws com.google.protobuf.InvalidProtocolBufferException {

return PARSER.parseFrom(data);

}

public static HBaseFilterNumberComparator parseFrom(

com.google.protobuf.ByteString data,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws com.google.protobuf.InvalidProtocolBufferException {

return PARSER.parseFrom(data, extensionRegistry);

}

public static HBaseFilterNumberComparator parseFrom(byte[] data)

throws com.google.protobuf.InvalidProtocolBufferException {

return PARSER.parseFrom(data);

}

public static HBaseFilterNumberComparator parseFrom(

byte[] data,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws com.google.protobuf.InvalidProtocolBufferException {

return PARSER.parseFrom(data, extensionRegistry);

}

public static HBaseFilterNumberComparator parseFrom(java.io.InputStream input)

throws java.io.IOException {

return PARSER.parseFrom(input);

}

public static HBaseFilterNumberComparator parseFrom(

java.io.InputStream input,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws java.io.IOException {

return PARSER.parseFrom(input, extensionRegistry);

}

public static HBaseFilterNumberComparator parseDelimitedFrom(java.io.InputStream input)

throws java.io.IOException {

return PARSER.parseDelimitedFrom(input);

}

public static HBaseFilterNumberComparator parseDelimitedFrom(

java.io.InputStream input,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws java.io.IOException {

return PARSER.parseDelimitedFrom(input, extensionRegistry);

}

public static HBaseFilterNumberComparator parseFrom(

com.google.protobuf.CodedInputStream input)

throws java.io.IOException {

return PARSER.parseFrom(input);

}

public static HBaseFilterNumberComparator parseFrom(

com.google.protobuf.CodedInputStream input,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws java.io.IOException {

return PARSER.parseFrom(input, extensionRegistry);

}

public static Builder newBuilder() { return Builder.create(); }

public Builder newBuilderForType() { return newBuilder(); }

public static Builder newBuilder(HBaseFilterNumberComparator prototype) {

return newBuilder().mergeFrom(prototype);

}

public Builder toBuilder() { return newBuilder(this); }

@Override

protected Builder newBuilderForType(

BuilderParent parent) {

Builder builder = new Builder(parent);

return builder;

}

/**

* Protobuf type {@code HBaseFilterNumberComparator}

*/

public static final class Builder extends

com.google.protobuf.GeneratedMessage.Builder<Builder>

implements HBaseFilterNumberComparatorOrBuilder {

public static final com.google.protobuf.Descriptors.Descriptor

getDescriptor() {

return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_descriptor;

}

protected FieldAccessorTable

internalGetFieldAccessorTable() {

return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_fieldAccessorTable

.ensureFieldAccessorsInitialized(

HBaseFilterNumberComparator.class, Builder.class);

}

// Construct using org.curitis.comparator.HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.newBuilder()

private Builder() {

maybeForceBuilderInitialization();

}

private Builder(

BuilderParent parent) {

super(parent);

maybeForceBuilderInitialization();

}

private void maybeForceBuilderInitialization() {

if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {

}

}

private static Builder create() {

return new Builder();

}

public Builder clear() {

super.clear();

value_ = com.google.protobuf.ByteString.EMPTY;

bitField0_ = (bitField0_ & ~0x00000001);

fieldType_ = "";

bitField0_ = (bitField0_ & ~0x00000002);

return this;

}

public Builder clone() {

return create().mergeFrom(buildPartial());

}

public com.google.protobuf.Descriptors.Descriptor

getDescriptorForType() {

return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_descriptor;

}

public HBaseFilterNumberComparator getDefaultInstanceForType() {

return HBaseFilterNumberComparator.getDefaultInstance();

}

public HBaseFilterNumberComparator build() {

HBaseFilterNumberComparator result = buildPartial();

if (!result.isInitialized()) {

throw newUninitializedMessageException(result);

}

return result;

}

public HBaseFilterNumberComparator buildPartial() {

HBaseFilterNumberComparator result = new HBaseFilterNumberComparator(this);

int from_bitField0_ = bitField0_;

int to_bitField0_ = 0;

if (((from_bitField0_ & 0x00000001) == 0x00000001)) {

to_bitField0_ |= 0x00000001;

}

result.value_ = value_;

if (((from_bitField0_ & 0x00000002) == 0x00000002)) {

to_bitField0_ |= 0x00000002;

}

result.fieldType_ = fieldType_;

result.bitField0_ = to_bitField0_;

onBuilt();

return result;

}

public Builder mergeFrom(com.google.protobuf.Message other) {

if (other instanceof HBaseFilterNumberComparator) {

return mergeFrom((HBaseFilterNumberComparator)other);

} else {

super.mergeFrom(other);

return this;

}

}

public Builder mergeFrom(HBaseFilterNumberComparator other) {

if (other == HBaseFilterNumberComparator.getDefaultInstance()) return this;

if (other.hasValue()) {

setValue(other.getValue());

}

if (other.hasFieldType()) {

bitField0_ |= 0x00000002;

fieldType_ = other.fieldType_;

onChanged();

}

this.mergeUnknownFields(other.getUnknownFields());

return this;

}

public final boolean isInitialized() {

if (!hasValue()) {

return false;

}

if (!hasFieldType()) {

return false;

}

return true;

}

public Builder mergeFrom(

com.google.protobuf.CodedInputStream input,

com.google.protobuf.ExtensionRegistryLite extensionRegistry)

throws java.io.IOException {

HBaseFilterNumberComparator parsedMessage = null;

try {

parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);

} catch (com.google.protobuf.InvalidProtocolBufferException e) {

parsedMessage = (HBaseFilterNumberComparator) e.getUnfinishedMessage();

throw e;

} finally {

if (parsedMessage != null) {

mergeFrom(parsedMessage);

}

}

return this;

}

private int bitField0_;

// required bytes value = 1;

private com.google.protobuf.ByteString value_ = com.google.protobuf.ByteString.EMPTY;

/**

* <code>required bytes value = 1;</code>

*/

public boolean hasValue() {

return ((bitField0_ & 0x00000001) == 0x00000001);

}

/**

* <code>required bytes value = 1;</code>

*/

public com.google.protobuf.ByteString getValue() {

return value_;

}

/**

* <code>required bytes value = 1;</code>

*/

public Builder setValue(com.google.protobuf.ByteString value) {

if (value == null) {

throw new NullPointerException();

}

bitField0_ |= 0x00000001;

value_ = value;

onChanged();

return this;

}

/**

* <code>required bytes value = 1;</code>

*/

public Builder clearValue() {

bitField0_ = (bitField0_ & ~0x00000001);

value_ = getDefaultInstance().getValue();

onChanged();

return this;

}

// required string fieldType = 2;

private Object fieldType_ = "";

/**

* <code>required string fieldType = 2;</code>

*/

public boolean hasFieldType() {

return ((bitField0_ & 0x00000002) == 0x00000002);

}

/**

* <code>required string fieldType = 2;</code>

*/

public String getFieldType() {

Object ref = fieldType_;

if (!(ref instanceof String)) {

String s = ((com.google.protobuf.ByteString) ref)

.toStringUtf8();

fieldType_ = s;

return s;

} else {

return (String) ref;

}

}

/**

* <code>required string fieldType = 2;</code>

*/

public com.google.protobuf.ByteString

getFieldTypeBytes() {

Object ref = fieldType_;

if (ref instanceof String) {

com.google.protobuf.ByteString b =

com.google.protobuf.ByteString.copyFromUtf8(

(String) ref);

fieldType_ = b;

return b;

} else {

return (com.google.protobuf.ByteString) ref;

}

}

/**

* <code>required string fieldType = 2;</code>

*/

public Builder setFieldType(

String value) {

if (value == null) {

throw new NullPointerException();

}

bitField0_ |= 0x00000002;

fieldType_ = value;

onChanged();

return this;

}

/**

* <code>required string fieldType = 2;</code>

*/

public Builder clearFieldType() {

bitField0_ = (bitField0_ & ~0x00000002);

fieldType_ = getDefaultInstance().getFieldType();

onChanged();

return this;

}

/**

* <code>required string fieldType = 2;</code>

*/

public Builder setFieldTypeBytes(

com.google.protobuf.ByteString value) {

if (value == null) {

throw new NullPointerException();

}

bitField0_ |= 0x00000002;

fieldType_ = value;

onChanged();

return this;

}

// @@protoc_insertion_point(builder_scope:HBaseFilterNumberComparator)

}

static {

defaultInstance = new HBaseFilterNumberComparator(true);

defaultInstance.initFields();

}

// @@protoc_insertion_point(class_scope:HBaseFilterNumberComparator)

}

private static com.google.protobuf.Descriptors.Descriptor

internal_static_HBaseFilterNumberComparator_descriptor;

private static

com.google.protobuf.GeneratedMessage.FieldAccessorTable

internal_static_HBaseFilterNumberComparator_fieldAccessorTable;

public static com.google.protobuf.Descriptors.FileDescriptor

getDescriptor() {

return descriptor;

}

private static com.google.protobuf.Descriptors.FileDescriptor

descriptor;

static {

String[] descriptorData = {

"

&HBaseFilterNumberComparatorProto.proto" +

""?

33HBaseFilterNumberComparator22

05value" +

"3001 02(142221

fieldType3002 02( B35

26org.curitis" +

".comparatorH012100101"

};

com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =

new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

public com.google.protobuf.ExtensionRegistry assignDescriptors(

com.google.protobuf.Descriptors.FileDescriptor root) {

descriptor = root;

internal_static_HBaseFilterNumberComparator_descriptor =

getDescriptor().getMessageTypes().get(0);

internal_static_HBaseFilterNumberComparator_fieldAccessorTable = new

com.google.protobuf.GeneratedMessage.FieldAccessorTable(

internal_static_HBaseFilterNumberComparator_descriptor,

new String[] { "Value", "FieldType", });

return null;

}

};

com.google.protobuf.Descriptors.FileDescriptor

.internalBuildGeneratedFileFrom(descriptorData,

new com.google.protobuf.Descriptors.FileDescriptor[] {

}, assigner);

}

// @@protoc_insertion_point(outer_class_scope)

}

以上是 HBase自定义Comparator过滤数值 的全部内容, 来源链接: utcz.com/z/518752.html

回到顶部