HBase自定义Comparator过滤数值
简介
要自定义HBase的Comparator,需要使用到protoc,可以在下面的链接下载:
protoc新版本下载
protoc2.5版本下载
完整的工程你可以在下面的链接下载:
HBase自定义Comparator实例
proto文件格式
enum Sex{ MALE = 1;
FEMALE = 2;
}
message Bean {
required int32 id = 1;
required string name = 2;
optional Sex sex = 3[defalut = MALE];
}
enum关键字定义一个枚举类型,message定义一个类。
- required: 必须包含该字段一次
- optional: 包含该字段零次或一次
- repeated: 该字段可以重复任意多次
字段定义说明:
optional Sex sex = 3 [defalut = MALE];字段规则 字段类型 字段名称 唯一标识符 可选的选项:默认值
数据默认值:
- int: 0
- string: 空字符
- bool: false
- enum: 枚举的第一项
定义一个proto文件
syntax = "proto2";option java_package = "org.curitis.comparator";
option java_outer_classname = "HBaseFilterNumberComparatorProto";
option java_generic_services = true;
option optimize_for = SPEED;
message HBaseFilterNumberComparator {
required bytes value = 1;
required string fieldType = 2;
}
第一行是指定protoc的版本,可以是proto2,也可以是proto3,根据自己需求,因为我使用的是HBase2.0.0版本,依赖的是protoc2.5,所以这里填写proto2。
想要看HBase依赖那个版本的protoc,可以在对应的工程下执行下面的命令:
mvn dependency:tree >> tree.txt
然后在对应的输出文件tree.txt中搜索protoc。
java_package指定生成的包名,java_outer_classname指定输出的类名,如果没有指定,默认使用proto scheme文件的名字。
message是定义要序列化的实体类。
protoc命令
protoc.exe ./HBaseFilterNumberComparatorProto.proto --java_out=./
编译非常简单,指定proto scheme文件,和输出目录就可以了。
自定义比较器
protoc编译生成的HBaseFilterNumberComparatorProto类比较长,我们放在最后,这里我们直接来看自定义比较器。
HBase自定义比较器一般继承ByteArrayComparable类,具体的实现,我们可以参考hBase-common包中已经实现了的LongComparator和BigDecimalComparator。
import com.google.protobuf.ByteString;import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
/**
* 自定义比较器
*/
public class FilterNumberComparator extends ByteArrayComparable {
private String fieldType;
private byte[] data;
/**
* @param value
* @param fieldType
*/
public FilterNumberComparator(byte[] value, String fieldType) {
super(value);
this.fieldType = fieldType.toLowerCase();
this.data = value;
}
@Override
public byte[] toByteArray() {
HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.Builder builder =
HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.newBuilder();
builder.setValue(ByteString.copyFrom(this.data));
builder.setFieldType(this.fieldType);
return builder.build().toByteArray();
}
// 定义该方法,用于对象反序列化操作
public static FilterNumberComparator parseFrom(final byte[] bytes) throws DeserializationException {
HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator proto;
try {
proto = HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return new FilterNumberComparator(proto.getValue().toByteArray(), proto.getFieldType());
}
@Override
public int compareTo(byte[] bytes, int offset, int length) {
if (fieldType.equalsIgnoreCase("int") || fieldType.equalsIgnoreCase("integer")) {
int that = Bytes.toInt(bytes, offset, length);
return Integer.compare(Bytes.toInt(data), that);
} else if (fieldType.equalsIgnoreCase("float")) {
float that = Bytes.toFloat(bytes, offset);
return Float.compare(Bytes.toFloat(data), that);
} else if (fieldType.equalsIgnoreCase("double")) {
double that = Bytes.toDouble(bytes, offset);
return Double.compare(Bytes.toDouble(data),that);
}
return 1;
}
}
具体的业务逻辑在compareTo方法中,上面我们实现了integer、float、double类型的比较。
具体怎样使用这个Comparator呢?
首先需要引入依赖的FilterNumberComparator的jar包
@Testpublic void singleColumnValueFilter() throws IOException {
Scan scan = new Scan();
FilterNumberComparator comparator = new FilterNumberComparator(Bytes.toBytes(1100.0),"double");
byte[] family = "consume".getBytes();
byte[] column = "total".getBytes();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
family,
column,
CompareOperator.GREATER,
comparator);
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);
ResultScanner resultScanner = table.getScanner(scan);
printResultScanner(resultScanner);
}
上面我们就是过滤列簇为consume中的列total的值大于1100的行,一定要注意是1100.0,而不是1100,因为是double类型的,如果是1100,Bytes.toBytes就会把它识别为整型。
运行问题
自定义FilterNumberComparator需要单独打成jar包,放在HBase的lib目录下,如:/usr/local/hbase-2.0.0/lib,如果是集群,注意每一个节点都要放。
然后需要重启HBase:
sudo ./stop-hbase.shsudo ./start-hbase.sh
HBaseFilterNumberComparatorProto
// Generated by the protocol buffer compiler. DO NOT EDIT!// source: HBaseFilterNumberComparatorProto.proto
package org.curitis.comparator;
public final class HBaseFilterNumberComparatorProto {
private HBaseFilterNumberComparatorProto() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface HBaseFilterNumberComparatorOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required bytes value = 1;
/**
* <code>required bytes value = 1;</code>
*/
boolean hasValue();
/**
* <code>required bytes value = 1;</code>
*/
com.google.protobuf.ByteString getValue();
// required string fieldType = 2;
/**
* <code>required string fieldType = 2;</code>
*/
boolean hasFieldType();
/**
* <code>required string fieldType = 2;</code>
*/
String getFieldType();
/**
* <code>required string fieldType = 2;</code>
*/
com.google.protobuf.ByteString
getFieldTypeBytes();
}
/**
* Protobuf type {@code HBaseFilterNumberComparator}
*/
public static final class HBaseFilterNumberComparator extends
com.google.protobuf.GeneratedMessage
implements HBaseFilterNumberComparatorOrBuilder {
// Use HBaseFilterNumberComparator.newBuilder() to construct.
private HBaseFilterNumberComparator(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private HBaseFilterNumberComparator(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final HBaseFilterNumberComparator defaultInstance;
public static HBaseFilterNumberComparator getDefaultInstance() {
return defaultInstance;
}
public HBaseFilterNumberComparator getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private HBaseFilterNumberComparator(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
value_ = input.readBytes();
break;
}
case 18: {
bitField0_ |= 0x00000002;
fieldType_ = input.readBytes();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_fieldAccessorTable
.ensureFieldAccessorsInitialized(
HBaseFilterNumberComparator.class, Builder.class);
}
public static com.google.protobuf.Parser<HBaseFilterNumberComparator> PARSER =
new com.google.protobuf.AbstractParser<HBaseFilterNumberComparator>() {
public HBaseFilterNumberComparator parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new HBaseFilterNumberComparator(input, extensionRegistry);
}
};
@Override
public com.google.protobuf.Parser<HBaseFilterNumberComparator> getParserForType() {
return PARSER;
}
private int bitField0_;
// required bytes value = 1;
public static final int VALUE_FIELD_NUMBER = 1;
private com.google.protobuf.ByteString value_;
/**
* <code>required bytes value = 1;</code>
*/
public boolean hasValue() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required bytes value = 1;</code>
*/
public com.google.protobuf.ByteString getValue() {
return value_;
}
// required string fieldType = 2;
public static final int FIELDTYPE_FIELD_NUMBER = 2;
private Object fieldType_;
/**
* <code>required string fieldType = 2;</code>
*/
public boolean hasFieldType() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>required string fieldType = 2;</code>
*/
public String getFieldType() {
Object ref = fieldType_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
fieldType_ = s;
}
return s;
}
}
/**
* <code>required string fieldType = 2;</code>
*/
public com.google.protobuf.ByteString
getFieldTypeBytes() {
Object ref = fieldType_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
fieldType_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() {
value_ = com.google.protobuf.ByteString.EMPTY;
fieldType_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasValue()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasFieldType()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, value_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, getFieldTypeBytes());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, value_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, getFieldTypeBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@Override
protected Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static HBaseFilterNumberComparator parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static HBaseFilterNumberComparator parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static HBaseFilterNumberComparator parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static HBaseFilterNumberComparator parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static HBaseFilterNumberComparator parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static HBaseFilterNumberComparator parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static HBaseFilterNumberComparator parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static HBaseFilterNumberComparator parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static HBaseFilterNumberComparator parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static HBaseFilterNumberComparator parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(HBaseFilterNumberComparator prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@Override
protected Builder newBuilderForType(
BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code HBaseFilterNumberComparator}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements HBaseFilterNumberComparatorOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_fieldAccessorTable
.ensureFieldAccessorsInitialized(
HBaseFilterNumberComparator.class, Builder.class);
}
// Construct using org.curitis.comparator.HBaseFilterNumberComparatorProto.HBaseFilterNumberComparator.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
value_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
fieldType_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return HBaseFilterNumberComparatorProto.internal_static_HBaseFilterNumberComparator_descriptor;
}
public HBaseFilterNumberComparator getDefaultInstanceForType() {
return HBaseFilterNumberComparator.getDefaultInstance();
}
public HBaseFilterNumberComparator build() {
HBaseFilterNumberComparator result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public HBaseFilterNumberComparator buildPartial() {
HBaseFilterNumberComparator result = new HBaseFilterNumberComparator(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.value_ = value_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.fieldType_ = fieldType_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof HBaseFilterNumberComparator) {
return mergeFrom((HBaseFilterNumberComparator)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(HBaseFilterNumberComparator other) {
if (other == HBaseFilterNumberComparator.getDefaultInstance()) return this;
if (other.hasValue()) {
setValue(other.getValue());
}
if (other.hasFieldType()) {
bitField0_ |= 0x00000002;
fieldType_ = other.fieldType_;
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasValue()) {
return false;
}
if (!hasFieldType()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
HBaseFilterNumberComparator parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (HBaseFilterNumberComparator) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// required bytes value = 1;
private com.google.protobuf.ByteString value_ = com.google.protobuf.ByteString.EMPTY;
/**
* <code>required bytes value = 1;</code>
*/
public boolean hasValue() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required bytes value = 1;</code>
*/
public com.google.protobuf.ByteString getValue() {
return value_;
}
/**
* <code>required bytes value = 1;</code>
*/
public Builder setValue(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
value_ = value;
onChanged();
return this;
}
/**
* <code>required bytes value = 1;</code>
*/
public Builder clearValue() {
bitField0_ = (bitField0_ & ~0x00000001);
value_ = getDefaultInstance().getValue();
onChanged();
return this;
}
// required string fieldType = 2;
private Object fieldType_ = "";
/**
* <code>required string fieldType = 2;</code>
*/
public boolean hasFieldType() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>required string fieldType = 2;</code>
*/
public String getFieldType() {
Object ref = fieldType_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref)
.toStringUtf8();
fieldType_ = s;
return s;
} else {
return (String) ref;
}
}
/**
* <code>required string fieldType = 2;</code>
*/
public com.google.protobuf.ByteString
getFieldTypeBytes() {
Object ref = fieldType_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
fieldType_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>required string fieldType = 2;</code>
*/
public Builder setFieldType(
String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
fieldType_ = value;
onChanged();
return this;
}
/**
* <code>required string fieldType = 2;</code>
*/
public Builder clearFieldType() {
bitField0_ = (bitField0_ & ~0x00000002);
fieldType_ = getDefaultInstance().getFieldType();
onChanged();
return this;
}
/**
* <code>required string fieldType = 2;</code>
*/
public Builder setFieldTypeBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
fieldType_ = value;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:HBaseFilterNumberComparator)
}
static {
defaultInstance = new HBaseFilterNumberComparator(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:HBaseFilterNumberComparator)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_HBaseFilterNumberComparator_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_HBaseFilterNumberComparator_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
String[] descriptorData = {
"
&HBaseFilterNumberComparatorProto.proto" +
""?
33HBaseFilterNumberComparator22
05value" +
"3001 02(142221
fieldType3002 02( B35
26org.curitis" +
".comparatorH012100101"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_HBaseFilterNumberComparator_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_HBaseFilterNumberComparator_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_HBaseFilterNumberComparator_descriptor,
new String[] { "Value", "FieldType", });
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}
以上是 HBase自定义Comparator过滤数值 的全部内容, 来源链接: utcz.com/z/518752.html