聊聊SpinalTap的BinlogEventListener

编程

本文主要研究一下SpinalTap的BinlogEventListener

BinlogEventListener

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/binlog_connector/BinaryLogConnectorSource.java

@Slf4j

public final class BinaryLogConnectorSource extends MysqlSource {

//......

private final class BinlogEventListener implements BinaryLogClient.EventListener {

public void onEvent(Event event) {

Preconditions.checkState(isStarted(), "Source is not started and should not process events");

final EventHeaderV4 header = event.getHeader();

final BinlogFilePos filePos =

new BinlogFilePos(

binlogClient.getBinlogFilename(),

header.getPosition(),

header.getNextPosition(),

binlogClient.getGtidSet(),

serverUUID);

BinaryLogConnectorEventMapper.INSTANCE

.map(event, filePos)

.ifPresent(BinaryLogConnectorSource.super::processEvent);

}

}

//......

}

  • SpinalTap的BinlogEventListener实现了BinaryLogClient.EventListener接口,其onEvent方法创建BinlogFilePos,然后使用BinaryLogConnectorEventMapper.INSTANCE进行转换,最后执行BinaryLogConnectorSource的processEvent方法

BinlogFilePos

SpinalTap/spinaltap-model/src/main/java/com/airbnb/spinaltap/mysql/BinlogFilePos.java

@Slf4j

@Getter

@EqualsAndHashCode

@NoArgsConstructor

@JsonDeserialize(builder = BinlogFilePos.Builder.class)

public class BinlogFilePos implements Comparable<BinlogFilePos>, Serializable {

private static final long serialVersionUID = 1549638989059430876L;

private static final Splitter SPLITTER = Splitter.on(":");

private static final String NULL_VALUE = "null";

public static final String DEFAULT_BINLOG_FILE_NAME = "mysql-bin-changelog";

@JsonProperty private String fileName;

@JsonProperty private long position;

@JsonProperty private long nextPosition;

@JsonProperty private GtidSet gtidSet;

@JsonProperty private String serverUUID;

public BinlogFilePos(long fileNumber) {

this(fileNumber, 4L, 4L);

}

public BinlogFilePos(String fileName) {

this(fileName, 4L, 4L);

}

public BinlogFilePos(long fileNumber, long position, long nextPosition) {

this(String.format("%s.%06d", DEFAULT_BINLOG_FILE_NAME, fileNumber), position, nextPosition);

}

public BinlogFilePos(

String fileName, long position, long nextPosition, String gtidSet, String serverUUID) {

this.fileName = fileName;

this.position = position;

this.nextPosition = nextPosition;

this.serverUUID = serverUUID;

if (gtidSet != null) {

this.gtidSet = new GtidSet(gtidSet);

}

}

public BinlogFilePos(String fileName, long position, long nextPosition) {

this(fileName, position, nextPosition, null, null);

}

public static BinlogFilePos fromString(@NonNull final String position) {

Iterator<String> parts = SPLITTER.split(position).iterator();

String fileName = parts.next();

String pos = parts.next();

String nextPos = parts.next();

if (NULL_VALUE.equals(fileName)) {

fileName = null;

}

return new BinlogFilePos(fileName, Long.parseLong(pos), Long.parseLong(nextPos));

}

@JsonIgnore

public long getFileNumber() {

if (fileName == null) {

return Long.MAX_VALUE;

}

if (fileName.equals("")) {

return Long.MIN_VALUE;

}

String num = fileName.substring(fileName.lastIndexOf(".") + 1);

return Long.parseLong(num);

}

@Override

public String toString() {

return String.format("%s:%d:%d", fileName, position, nextPosition);

}

@Override

public int compareTo(@NonNull final BinlogFilePos other) {

if (shouldCompareUsingFilePosition(this, other)) {

return getFileNumber() != other.getFileNumber()

? Long.compare(getFileNumber(), other.getFileNumber())

: Long.compare(getPosition(), other.getPosition());

}

if (this.gtidSet.equals(other.gtidSet)) {

return 0;

}

if (this.gtidSet.isContainedWithin(other.gtidSet)) {

return -1;

}

return 1;

}

/** Check if two BinlogFilePos are from the same source MySQL server */

private static boolean isFromSameSource(BinlogFilePos pos1, BinlogFilePos pos2) {

return pos1.getServerUUID() != null

&& pos1.getServerUUID().equalsIgnoreCase(pos2.getServerUUID());

}

/** Whether we can compare two BinlogFilePos using Binlog file position (without GTIDSet) */

public static boolean shouldCompareUsingFilePosition(BinlogFilePos pos1, BinlogFilePos pos2) {

return isFromSameSource(pos1, pos2) || pos1.getGtidSet() == null || pos2.getGtidSet() == null;

}

//......

}

  • BinlogFilePos定义了fileName、position、nextPosition、serverUUID属性

BinaryLogConnectorEventMapper

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/binlog_connector/BinaryLogConnectorEventMapper.java

@NoArgsConstructor(access = AccessLevel.PRIVATE)

public final class BinaryLogConnectorEventMapper {

public static final BinaryLogConnectorEventMapper INSTANCE = new BinaryLogConnectorEventMapper();

public Optional<BinlogEvent> map(

@NonNull final Event event, @NonNull final BinlogFilePos position) {

final EventHeaderV4 header = event.getHeader();

final EventType eventType = header.getEventType();

final long serverId = header.getServerId();

final long timestamp = header.getTimestamp();

if (EventType.isWrite(eventType)) {

final WriteRowsEventData data = event.getData();

return Optional.of(

new WriteEvent(data.getTableId(), serverId, timestamp, position, data.getRows()));

} else if (EventType.isUpdate(eventType)) {

final UpdateRowsEventData data = event.getData();

return Optional.of(

new UpdateEvent(data.getTableId(), serverId, timestamp, position, data.getRows()));

} else if (EventType.isDelete(eventType)) {

final DeleteRowsEventData data = event.getData();

return Optional.of(

new DeleteEvent(data.getTableId(), serverId, timestamp, position, data.getRows()));

} else {

switch (eventType) {

case TABLE_MAP:

TableMapEventData tableMapData = event.getData();

return Optional.of(

new TableMapEvent(

tableMapData.getTableId(),

serverId,

timestamp,

position,

tableMapData.getDatabase(),

tableMapData.getTable(),

tableMapData.getColumnTypes()));

case XID:

final XidEventData xidData = event.getData();

return Optional.of(new XidEvent(serverId, timestamp, position, xidData.getXid()));

case GTID:

final GtidEventData gtidEventData = event.getData();

return Optional.of(new GTIDEvent(serverId, timestamp, position, gtidEventData.getGtid()));

case QUERY:

final QueryEventData queryData = event.getData();

return Optional.of(

new QueryEvent(

serverId,

timestamp,

position,

queryData.getDatabase(),

queryData

.getSql()

// https://dev.mysql.com/doc/refman/5.7/en/comments.html

// Replace MySQL-specific comments (/*! ... */ and /*!50110 ... */) which

// are actually executed

.replaceAll("/*!(?:d{5})?(.*?)*/", "$1")

// Remove block comments

// https://stackoverflow.com/questions/13014947/regex-to-match-a-c-style-multiline-comment

// line comments and newlines are kept

// Note: This does not handle comments in quotes

.replaceAll("/*[^*]**+(?:[^/*][^*]**+)*/", " ")

// Remove extra spaces

.replaceAll("h+", " ")

.replaceAll("^s+", "")));

case FORMAT_DESCRIPTION:

return Optional.of(new StartEvent(serverId, timestamp, position));

default:

return Optional.empty();

}

}

}

}

  • BinaryLogConnectorEventMapper提供了map方法,它接收Event、BinlogFilePos参数,然后根据eventType来返回不同的BinlogEvent

AbstractSource

SpinalTap/spinaltap-common/src/main/java/com/airbnb/spinaltap/common/source/AbstractSource.java

@Slf4j

@RequiredArgsConstructor

public abstract class AbstractSource<E extends SourceEvent> extends ListenableSource<E> {

//......

public final void processEvent(final E event) {

try {

if (!eventFilter.apply(event)) {

log.debug("Event filtered from source {}. Skipping. event={}", name, event);

return;

}

notifyEvent(event);

final Stopwatch stopwatch = Stopwatch.createStarted();

metrics.eventReceived(event);

log.debug("Received event from source {}. event={}", name, event);

notifyMutations(mutationMapper.map(event));

stopwatch.stop();

final long time = stopwatch.elapsed(TimeUnit.MILLISECONDS);

metrics.processEventTime(event, time);

} catch (Exception ex) {

if (!isStarted()) {

// Do not process the exception if streaming has stopped.

return;

}

final String errorMessage = String.format("Failed to process event from source %s", name);

log.error(errorMessage, ex);

metrics.eventFailure(ex);

notifyError(ex);

throw new SourceException(errorMessage, ex);

}

}

//......

}

  • AbstractSource的processEvent方法首先执行eventFilter.apply(event),没有被过滤掉的话则执行notifyEvent及notifyMutations

ListenableSource

SpinalTap/spinaltap-common/src/main/java/com/airbnb/spinaltap/common/source/ListenableSource.java

abstract class ListenableSource<E extends SourceEvent> implements Source {

private final List<Listener> listeners = new ArrayList<>();

@Override

public void addListener(@NonNull final Listener listener) {

listeners.add(listener);

}

@Override

public void removeListener(@NonNull final Listener listener) {

listeners.remove(listener);

}

protected void notifyMutations(final List<? extends Mutation<?>> mutations) {

if (!mutations.isEmpty()) {

listeners.forEach(listener -> listener.onMutation(mutations));

}

}

protected void notifyEvent(E event) {

listeners.forEach(listener -> listener.onEvent(event));

}

protected void notifyError(Throwable error) {

listeners.forEach(listener -> listener.onError(error));

}

protected void notifyStart() {

listeners.forEach(Source.Listener::onStart);

}

}

  • ListenableSource的notifyEvent及notifyMutations都是遍历listeners执行listener.onEvent或者listener.onMutation方法

SourceListener

SpinalTap/spinaltap-common/src/main/java/com/airbnb/spinaltap/common/pipe/Pipe.java

  final class SourceListener extends Source.Listener {

public void onMutation(List<? extends Mutation<?>> mutations) {

destination.send(mutations);

}

public void onError(Throwable error) {

errorHandlingExecutor.execute(Pipe.this::close);

}

}

  • SourceListener继承了Source.Listener,其onMutation方法执行destination.send(mutations)

小结

SpinalTap的BinlogEventListener实现了BinaryLogClient.EventListener接口,其onEvent方法创建BinlogFilePos,然后使用BinaryLogConnectorEventMapper.INSTANCE进行转换,最后执行BinaryLogConnectorSource的processEvent方法

doc

  • BinaryLogConnectorSource

以上是 聊聊SpinalTap的BinlogEventListener 的全部内容, 来源链接: utcz.com/z/516922.html

回到顶部