聊聊puma的Parser

编程

本文主要研究一下puma的Parser

Parser

puma/puma/src/main/java/com/dianping/puma/parser/Parser.java

public interface Parser extends LifeCycle {

BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException;

}

  • Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent

DefaultBinlogParser

puma/puma/src/main/java/com/dianping/puma/parser/DefaultBinlogParser.java

@ThreadSafe

public class DefaultBinlogParser implements Parser {

private final Logger logger = LoggerFactory.getLogger(DefaultBinlogParser.class);

private static Map<Byte, Class<? extends BinlogEvent>> eventMaps = new ConcurrentHashMap<Byte, Class<? extends BinlogEvent>>();

@Override

public BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException {

logger.debug("

");

logger.debug("****************************** binlog parse begin ******************************");

BinlogHeader header = new BinlogHeader();

header.parse(buf, context);

logger.debug("binlog event header:

");

logger.debug("{}", header);

BinlogEvent event = null;

Class<? extends BinlogEvent> eventClass = eventMaps.get(header.getEventType());

if (eventClass != null) {

try {

event = eventClass.newInstance();

} catch (Exception e) {

logger.error("Init event class failed. eventType: " + header.getEventType(), e);

event = null;

}

}

if (event == null) {

event = new PumaIgnoreEvent();

}

logger.debug("binlog event type:

");

logger.debug("{}", event.getClass());

event.parse(buf, context, header);

logger.debug("binlog event:

");

logger.debug("{}", event);

logger.debug("****************************** binlog parse end ******************************");

logger.debug("

");

return event;

}

/*

* (non-Javadoc)

*

* @see com.dianping.puma.common.LifeCycle#start()

*/

@Override

public void start() {

eventMaps.put(BinlogConstants.UNKNOWN_EVENT, UnknownEvent.class);

eventMaps.put(BinlogConstants.QUERY_EVENT, QueryEvent.class);

eventMaps.put(BinlogConstants.STOP_EVENT, StopEvent.class);

eventMaps.put(BinlogConstants.ROTATE_EVENT, RotateEvent.class);

eventMaps.put(BinlogConstants.INTVAR_EVENT, IntVarEvent.class);

eventMaps.put(BinlogConstants.RAND_EVENT, RandEvent.class);

eventMaps.put(BinlogConstants.USER_VAR_EVENT, UserVarEvent.class);

eventMaps.put(BinlogConstants.FORMAT_DESCRIPTION_EVENT, FormatDescriptionEvent.class);

eventMaps.put(BinlogConstants.XID_EVENT, XIDEvent.class);

eventMaps.put(BinlogConstants.TABLE_MAP_EVENT, TableMapEvent.class);

eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT_V1, WriteRowsEvent.class);

eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT_V1, UpdateRowsEvent.class);

eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT_V1, DeleteRowsEvent.class);

eventMaps.put(BinlogConstants.INCIDENT_EVENT, IncidentEvent.class);

//mysql --5.6

eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT, WriteRowsEvent.class);

eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT, UpdateRowsEvent.class);

eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT, DeleteRowsEvent.class);

eventMaps.put(BinlogConstants.HEARTBEAT_LOG_EVENT, HeartbeatEvent.class);

eventMaps.put(BinlogConstants.IGNORABLE_LOG_EVENT, IgnorableEvent.class);

eventMaps.put(BinlogConstants.ROWS_QUERY_LOG_EVENT, RowsQueryEvent.class);

eventMaps.put(BinlogConstants.GTID_LOG_EVENT, GtidEvent.class);

eventMaps.put(BinlogConstants.ANONYMOUS_GTID_LOG_EVENT, AnonymousGtidEvent.class);

eventMaps.put(BinlogConstants.PREVIOUS_GTIDS_LOG_EVENT, PreviousGtidsEvent.class);

}

@Override

public void stop() {

}

}

  • DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

BinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/BinlogEvent.java

public interface BinlogEvent extends Serializable {

BinlogHeader getHeader();

void setHeader(BinlogHeader header);

void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException;

}

  • BinlogEvent接口定义了getHeader、setHeader、parse方法

AbstractBinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/AbstractBinlogEvent.java

public abstract class AbstractBinlogEvent implements BinlogEvent {

private static final long serialVersionUID = -8136236885229956889L;

private BinlogHeader header;

private int checksumAlg = BinlogConstants.CHECKSUM_ALG_OFF;

private long crc;

@Override

public void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException {

this.header = header;

doParse(buf, context);

if (!(this.header.getEventType() == BinlogConstants.ROTATE_EVENT)) {

checksumAlg = context.getChecksumAlg(); // fetch checksum alg

parseCheckSum(buf);

}

}

@Override

public BinlogHeader getHeader() {

return header;

}

@Override

public void setHeader(BinlogHeader header) {

this.header = header;

}

public abstract void doParse(ByteBuffer buf, PumaContext context) throws IOException;

private void parseCheckSum(ByteBuffer buf) {

if (checksumAlg != BinlogConstants.CHECKSUM_ALG_OFF && checksumAlg != BinlogConstants.CHECKSUM_ALG_UNDEF) {

buf.position((int) (this.header.getEventLength() - 4));

setCrc(PacketUtils.readLong(buf, 4));

}

}

@Override public String toString() {

return new ToStringBuilder(this)

.append("header", header)

.append("checksumAlg", checksumAlg)

.append("crc", crc)

.toString();

}

public void setChecksumAlg(int checksumAlg) {

this.checksumAlg = checksumAlg;

}

public int getChecksumAlg() {

return checksumAlg;

}

public long getCrc() {

return crc;

}

public void setCrc(long crc) {

this.crc = crc;

}

public boolean isRemaining(ByteBuffer buf, PumaContext context) {

return context.isCheckSum() ? buf.remaining() - 4 > 0 : buf.hasRemaining();

}

public int lenRemaining(ByteBuffer buf, PumaContext context) {

return context.isCheckSum() ? buf.remaining() - 4 : buf.remaining();

}

}

  • AbstractBinlogEvent声明实现了BinlogEvent接口,其parse方法会调用doParse方法,之后对于非ROTATE_EVENT会执行parseCheckSum

小结

Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent;DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

doc

  • Parser

以上是 聊聊puma的Parser 的全部内容, 来源链接: utcz.com/z/517070.html

回到顶部