聊聊BinaryLogClient的EventListener

编程

EventListener

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

    public interface EventListener {

void onEvent(Event event);

}

  • EventListener接口定义了onEvent方法

BinaryLogClientStatistics

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java

public class BinaryLogClientStatistics implements BinaryLogClientStatisticsMXBean,

BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {

private AtomicReference<EventHeader> lastEventHeader = new AtomicReference<EventHeader>();

private AtomicLong timestampOfLastEvent = new AtomicLong();

private AtomicLong totalNumberOfEventsSeen = new AtomicLong();

private AtomicLong totalBytesReceived = new AtomicLong();

private AtomicLong numberOfSkippedEvents = new AtomicLong();

private AtomicLong numberOfDisconnects = new AtomicLong();

public BinaryLogClientStatistics() {

}

public BinaryLogClientStatistics(BinaryLogClient binaryLogClient) {

binaryLogClient.registerEventListener(this);

binaryLogClient.registerLifecycleListener(this);

}

@Override

public String getLastEvent() {

EventHeader eventHeader = lastEventHeader.get();

return eventHeader == null ? null : eventHeader.getEventType() + "/" + eventHeader.getTimestamp() +

" from server " + eventHeader.getServerId();

}

@Override

public long getSecondsSinceLastEvent() {

long timestamp = timestampOfLastEvent.get();

return timestamp == 0 ? 0 : (getCurrentTimeMillis() - timestamp) / 1000;

}

@Override

public long getSecondsBehindMaster() {

// because lastEventHeader and timestampOfLastEvent are not guarded by the common lock

// we may get some "distorted" results, though shouldn"t be a problem given the nature of the final value

long timestamp = timestampOfLastEvent.get();

EventHeader eventHeader = lastEventHeader.get();

if (timestamp == 0 || eventHeader == null) {

return -1;

}

return (timestamp - eventHeader.getTimestamp()) / 1000;

}

@Override

public long getTotalNumberOfEventsSeen() {

return totalNumberOfEventsSeen.get();

}

@Override

public long getTotalBytesReceived() {

return totalBytesReceived.get();

}

@Override

public long getNumberOfSkippedEvents() {

return numberOfSkippedEvents.get();

}

@Override

public long getNumberOfDisconnects() {

return numberOfDisconnects.get();

}

@Override

public void reset() {

lastEventHeader.set(null);

timestampOfLastEvent.set(0);

totalNumberOfEventsSeen.set(0);

totalBytesReceived.set(0);

numberOfSkippedEvents.set(0);

numberOfDisconnects.set(0);

}

@Override

public void onEvent(Event event) {

EventHeader header = event.getHeader();

lastEventHeader.set(header);

timestampOfLastEvent.set(getCurrentTimeMillis());

totalNumberOfEventsSeen.getAndIncrement();

totalBytesReceived.getAndAdd(header.getHeaderLength() + header.getDataLength());

}

@Override

public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {

numberOfSkippedEvents.getAndIncrement();

lastEventHeader.set(null);

timestampOfLastEvent.set(getCurrentTimeMillis());

totalNumberOfEventsSeen.getAndIncrement();

}

@Override

public void onDisconnect(BinaryLogClient client) {

numberOfDisconnects.getAndIncrement();

}

@Override

public void onConnect(BinaryLogClient client) {

}

@Override

public void onCommunicationFailure(BinaryLogClient client, Exception ex) {

}

protected long getCurrentTimeMillis() {

return System.currentTimeMillis();

}

}

  • BinaryLogClientStatistics实现了BinaryLogClient.EventListener接口,其onEvent方法会更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceived

listenForEventPackets

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

public class BinaryLogClient implements BinaryLogClientMXBean {

//......

private void listenForEventPackets() throws IOException {

ByteArrayInputStream inputStream = channel.getInputStream();

boolean completeShutdown = false;

try {

while (inputStream.peek() != -1) {

int packetLength = inputStream.readInteger(3);

inputStream.skip(1); // 1 byte for sequence

int marker = inputStream.read();

if (marker == 0xFF) {

ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));

throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),

errorPacket.getSqlState());

}

if (marker == 0xFE && !blocking) {

completeShutdown = true;

break;

}

Event event;

try {

event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?

new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :

inputStream);

if (event == null) {

throw new EOFException();

}

} catch (Exception e) {

Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;

if (cause instanceof EOFException || cause instanceof SocketException) {

throw e;

}

if (isConnected()) {

for (LifecycleListener lifecycleListener : lifecycleListeners) {

lifecycleListener.onEventDeserializationFailure(this, e);

}

}

continue;

}

if (isConnected()) {

eventLastSeen = System.currentTimeMillis();

updateGtidSet(event);

notifyEventListeners(event);

updateClientBinlogFilenameAndPosition(event);

}

}

} catch (Exception e) {

if (isConnected()) {

for (LifecycleListener lifecycleListener : lifecycleListeners) {

lifecycleListener.onCommunicationFailure(this, e);

}

}

} finally {

if (isConnected()) {

if (completeShutdown) {

disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)

} else {

disconnectChannel();

}

}

}

}

private void notifyEventListeners(Event event) {

if (event.getData() instanceof EventDataWrapper) {

event = new Event(event.getHeader(), ((EventDataWrapper) event.getData()).getExternal());

}

for (EventListener eventListener : eventListeners) {

try {

eventListener.onEvent(event);

} catch (Exception e) {

if (logger.isLoggable(Level.WARNING)) {

logger.log(Level.WARNING, eventListener + " choked on " + event, e);

}

}

}

}

//......

}

  • listenForEventPackets方法会读取channel.getInputStream(),然后通过eventDeserializer.nextEvent解析为event,之后调用updateGtidSet、notifyEventListeners、updateClientBinlogFilenameAndPosition方法;notifyEventListeners方法会遍历eventListeners挨个执行其onEvent方法

小结

EventListener接口定义了onEvent方法;BinaryLogClientStatistics实现了BinaryLogClient.EventListener接口,其onEvent方法会更新lastEventHeader、timestampOfLastEvent、totalNumberOfEventsSeen、totalBytesReceived

doc

  • BinaryLogClient

以上是 聊聊BinaryLogClient的EventListener 的全部内容, 来源链接: utcz.com/z/515958.html

回到顶部