聊聊rocketmqmysql的ColumnParser

编程

本文主要研究一下rocketmq-mysql的ColumnParser

ColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java

public abstract class ColumnParser {

public static ColumnParser getColumnParser(String dataType, String colType, String charset) {

switch (dataType) {

case "tinyint":

case "smallint":

case "mediumint":

case "int":

return new IntColumnParser(dataType, colType);

case "bigint":

return new BigIntColumnParser(colType);

case "tinytext":

case "text":

case "mediumtext":

case "longtext":

case "varchar":

case "char":

return new StringColumnParser(charset);

case "date":

case "datetime":

case "timestamp":

return new DateTimeColumnParser();

case "time":

return new TimeColumnParser();

case "year":

return new YearColumnParser();

case "enum":

return new EnumColumnParser(colType);

case "set":

return new SetColumnParser(colType);

default:

return new DefaultColumnParser();

}

}

public static String[] extractEnumValues(String colType) {

String[] enumValues = {};

Matcher matcher = Pattern.compile("(enum|set)((.*))").matcher(colType);

if (matcher.matches()) {

enumValues = matcher.group(2).replace(""", "").split(",");

}

return enumValues;

}

public abstract Object getValue(Object value);

}

  • ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

IntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java

public class IntColumnParser extends ColumnParser {

private int bits;

private boolean signed;

public IntColumnParser(String dataType, String colType) {

switch (dataType) {

case "tinyint":

bits = 8;

break;

case "smallint":

bits = 16;

break;

case "mediumint":

bits = 24;

break;

case "int":

bits = 32;

}

this.signed = !colType.matches(".* unsigned$");

}

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof Long) {

return value;

}

if (value instanceof Integer) {

Integer i = (Integer) value;

if (signed || i > 0) {

return i;

} else {

return (1L << bits) + i;

}

}

return value;

}

}

  • IntColumnParser解析tinyint、smallint、mediumint、int类型

BigIntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java

public class BigIntColumnParser extends ColumnParser {

private static BigInteger max = BigInteger.ONE.shiftLeft(64);

private boolean signed;

public BigIntColumnParser(String colType) {

this.signed = !colType.matches(".* unsigned$");

}

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof BigInteger) {

return value;

}

Long l = (Long) value;

if (!signed && l < 0) {

return max.add(BigInteger.valueOf(l));

} else {

return l;

}

}

}

  • BigIntColumnParser解析bigint类型

StringColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java

public class StringColumnParser extends ColumnParser {

private String charset;

public StringColumnParser(String charset) {

this.charset = charset.toLowerCase();

}

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof String) {

return value;

}

byte[] bytes = (byte[]) value;

switch (charset) {

case "utf8":

case "utf8mb4":

return new String(bytes, Charsets.UTF_8);

case "latin1":

case "ascii":

return new String(bytes, Charsets.ISO_8859_1);

case "ucs2":

return new String(bytes, Charsets.UTF_16);

default:

return new String(bytes, Charsets.toCharset(charset));

}

}

}

  • StringColumnParser解析tinytext、text、mediumtext、longtext、varchar、char类型

DateTimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java

public class DateTimeColumnParser extends ColumnParser {

private static SimpleDateFormat dateTimeFormat;

private static SimpleDateFormat dateTimeUtcFormat;

static {

dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

}

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof Timestamp) {

return dateTimeFormat.format(value);

}

if (value instanceof Long) {

return dateTimeUtcFormat.format(new Date((Long) value));

}

return value;

}

}

  • DateTimeColumnParser解析date、datetime、timestamp类型

TimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java

public class TimeColumnParser extends ColumnParser {

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof Timestamp) {

return new Time(((Timestamp) value).getTime());

}

return value;

}

}

  • TimeColumnParser解析time类型

YearColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java

public class YearColumnParser extends ColumnParser {

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof Date) {

Calendar calendar = Calendar.getInstance();

calendar.setTime((Date) value);

return calendar.get(Calendar.YEAR);

}

return value;

}

}

  • YearColumnParser解析year类型

EnumColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java

public class EnumColumnParser extends ColumnParser {

private String[] enumValues;

public EnumColumnParser(String colType) {

enumValues = extractEnumValues(colType);

}

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof String) {

return value;

}

Integer i = (Integer) value;

if (i == 0) {

return null;

} else {

return enumValues[i - 1];

}

}

}

  • EnumColumnParser解析enum类型

SetColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java

public class SetColumnParser extends ColumnParser {

private String[] enumValues;

public SetColumnParser(String colType) {

enumValues = extractEnumValues(colType);

}

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof String) {

return value;

}

StringBuilder builder = new StringBuilder();

long l = (Long) value;

boolean needSplit = false;

for (int i = 0; i < enumValues.length; i++) {

if (((l >> i) & 1) == 1) {

if (needSplit)

builder.append(",");

builder.append(enumValues[i]);

needSplit = true;

}

}

return builder.toString();

}

}

  • SetColumnParser解析set类型

DefaultColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java

public class DefaultColumnParser extends ColumnParser {

@Override

public Object getValue(Object value) {

if (value == null) {

return null;

}

if (value instanceof byte[]) {

return Base64.encodeBase64String((byte[]) value);

}

return value;

}

}

  • DefaultColumnParser通过base64将byte数组转为string

小结

ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

doc

  • ColumnParser

以上是 聊聊rocketmqmysql的ColumnParser 的全部内容, 来源链接: utcz.com/z/516833.html

回到顶部