聊聊puma的DefaultTaskExecutor

编程

本文主要研究一下puma的DefaultTaskExecutor

TaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/TaskExecutor.java

public interface TaskExecutor extends LifeCycle {

boolean isStop();

boolean isMerging();

void stopUntil(long timestamp);

void cancelStopUntil();

void setContext(PumaContext context);

void initContext();

PumaContext getContext();

String getTaskId();

void setTaskId(String taskId);

String getTaskName();

void setTaskName(String taskName);

String getDefaultBinlogFileName();

void setDefaultBinlogFileName(String binlogFileName);

Long getDefaultBinlogPosition();

void setDefaultBinlogPosition(Long binlogFileName);

void setInstanceStorageManager(InstanceStorageManager holder);

List<Sender> getFileSender();

DataHandler getDataHandler();

void resume() throws Exception;

void pause() throws Exception;

PumaTaskStateEntity getTaskState();

void setTaskState(PumaTaskStateEntity taskState);

void setInstanceTask(InstanceTask instanceTask);

InstanceTask getInstanceTask();

TableSet getTableSet();

}

  • TaskExecutor继承了LifeCycle,定义了initContext、getContext等方法

AbstractTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/AbstractTaskExecutor.java

@ThreadUnSafe

public abstract class AbstractTaskExecutor implements TaskExecutor {

private PumaContext context;

private String taskId;

private long serverId;

protected String taskName;

protected Date beginTime;

protected TableSet tableSet;

private String defaultBinlogFileName;

private Long defaultBinlogPosition;

protected Parser parser;

protected DataHandler dataHandler;

protected Dispatcher dispatcher;

private volatile boolean stop = true;

protected InstanceStorageManager instanceStorageManager;

protected PumaTaskStateEntity state;

protected InstanceManager instanceManager;

@Override

public String getTaskId() {

return taskId;

}

@Override

public void setTaskId(String taskId) {

this.taskId = taskId;

}

@Override

public String getTaskName() {

return taskName;

}

@Override

public void setTaskName(String taskName) {

this.taskName = taskName;

}

/**

* @param instanceStorageManager

* the binlogPositionHolder to set

*/

public void setInstanceStorageManager(InstanceStorageManager instanceStorageManager) {

this.instanceStorageManager = instanceStorageManager;

}

public void setContext(PumaContext context) {

this.context = context;

}

public PumaContext getContext() {

return context;

}

public String getDefaultBinlogFileName() {

return defaultBinlogFileName;

}

public void setDefaultBinlogFileName(String binlogFileName) {

this.defaultBinlogFileName = binlogFileName;

}

/**

* @return the defaultBinlogPosition

*/

public Long getDefaultBinlogPosition() {

return defaultBinlogPosition;

}

/**

* @param defaultBinlogPosition

* the defaultBinlogPosition to set

*/

public void setDefaultBinlogPosition(Long defaultBinlogPosition) {

this.defaultBinlogPosition = defaultBinlogPosition;

}

/**

* @param parser

* the parser to set

*/

public void setParser(Parser parser) {

this.parser = parser;

}

/**

* @param dataHandler

* the dataHandler to set

*/

public void setDataHandler(DataHandler dataHandler) {

this.dataHandler = dataHandler;

}

/**

* @param dispatcher

* the dispatcher to set

*/

public void setDispatcher(Dispatcher dispatcher) {

this.dispatcher = dispatcher;

}

public long getServerId() {

return serverId;

}

public void setServerId(long serverId) {

this.serverId = serverId;

}

public boolean isStop() {

return stop;

}

protected abstract void doStop() throws Exception;

protected abstract void doStart() throws Exception;

@Override

public void start() {

try {

stop = false;

parser.start();

dataHandler.start();

dispatcher.start();

doStart();

} catch (Exception e) {

throw new RuntimeException(e);

}

}

@Override

public void stop() {

try {

stop = true;

parser.stop();

dataHandler.stop();

dispatcher.stop();

doStop();

} catch (Exception e) {

throw new RuntimeException(e);

}

}

public void resume() throws Exception {

stop = false;

}

public void pause() throws Exception {

stop = true;

}

@Override

public List<Sender> getFileSender() {

return dispatcher.getSenders();

}

@Override

public DataHandler getDataHandler() {

return this.dataHandler;

}

public PumaTaskStateEntity getTaskState() {

return state;

}

public void setTaskState(PumaTaskStateEntity state) {

this.state = state;

}

public Date getBeginTime() {

return beginTime;

}

public void setBeginTime(Date beginTime) {

this.beginTime = beginTime;

}

public TableSet getTableSet() {

return tableSet;

}

public void setTableSet(TableSet tableSet) {

this.tableSet = tableSet;

}

public InstanceManager getInstanceManager() {

return instanceManager;

}

public void setInstanceManager(InstanceManager instanceManager) {

this.instanceManager = instanceManager;

}

}

  • AbstractTaskExecutor声明实现TaskExecutor接口,它定义了context、defaultBinlogFileName、defaultBinlogPosition、parser、dataHandler、dispatcher等属性;其start方法执行parser、dataHandler、dispatcher的start方法及doStart方法;其stop方法执行parser、dataHandler、dispatcher的stop方法及doStop方法

DefaultTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

@ThreadUnSafe

public class DefaultTaskExecutor extends AbstractTaskExecutor {

private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskExecutor.class);

private SrcDbEntity currentSrcDbEntity;

private DefaultTableMetaInfoFetcher tableMetaInfoFetcher;

private String encoding = "utf-8";

private Socket mysqlSocket;

private InputStream is;

private OutputStream os;

private InstanceTask instanceTask;

private boolean merging = false;

private long runUntilTimestamp;

@Override

public void doStart() throws Exception {

Thread.currentThread().setName("DefaultTaskExecutor-" + taskName);

long failCount = 0;

merging = false;

SystemStatusManager.addServer(getTaskName(), "", 0, tableSet);

do {

try {

loadServerId(instanceManager.getUrlByCluster(instanceTask.getInstance()));

// 读position/file文件

BinlogInfo binlogInfo = instanceStorageManager.getBinlogInfo(getContext().getPumaServerName());

if (binlogInfo == null) {

this.currentSrcDbEntity = initSrcDbByServerId(-1);

if (beginTime != null) {

binlogInfo = getBinlogByTimestamp(beginTime.getTime() / 1000);

}

} else {

this.currentSrcDbEntity = initSrcDbByServerId(binlogInfo.getServerId());

if (binlogInfo.getServerId() != currentSrcDbEntity.getServerId()) {

BinlogInfo oldBinlogInfo = binlogInfo;

binlogInfo = getBinlogByTimestamp(oldBinlogInfo.getTimestamp() - 60);

if (binlogInfo == null) {

throw new IOException("Switch Binlog Failed!");

} else {

Cat.logEvent("BinlogSwitch", taskName, Message.SUCCESS,

oldBinlogInfo.toString() + " -> " + binlogInfo.toString());

}

}

}

updateTableMetaInfoFetcher();

getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

if (!connect()) {

throw new IOException("Connection failed.");

}

initConnect();

initBinlogPosition(binlogInfo);

if (dumpBinlog()) {

processBinlog();

} else {

throw new IOException("Binlog dump failed.");

}

} catch (Exception e) {

if (++failCount % 3 == 0) {

this.currentSrcDbEntity = chooseNextSrcDb();

updateTableMetaInfoFetcher();

failCount = 0;

}

String msg = "Exception occurs. taskName: " + getTaskName() + " dbServerId: " + (currentSrcDbEntity == null ? 0 : currentSrcDbEntity.getServerId())

+ ". Reconnect...";

LOG.error(msg, e);

Cat.logError(msg, e);

Thread.sleep(((failCount % 10) + 1) * 2000);

}

} while (!isStop() && !Thread.currentThread().isInterrupted());

}

protected void doStop() throws Exception {

LOG.info("TaskName: " + getTaskName() + ", Stopped.");

closeTransport();

SystemStatusManager.deleteServer(getTaskName());

}

//......

}

  • DefaultTaskExecutor继承了AbstractTaskExecutor,其doStart方法通过instanceStorageManager.getBinlogInfo获取binlogInfo,若为null且beginTime不为null则从getBinlogByTimestamp获取binlogInfo,之后执行updateTableMetaInfoFetcher、connect、initConnect、initBinlogPosition、dumpBinlog、processBinlog方法;其doStop方法主要执行closeTransport、SystemStatusManager.deleteServer(getTaskName())方法

getBinlogByTimestamp

    protected BinlogInfo getBinlogByTimestamp(long time) throws IOException {

BinlogInfo binlogResult = null;

Transaction t = Cat.newTransaction("BinlogFindByTime", taskName);

Cat.logEvent("BinlogFindByTime.Time", String.valueOf(time));

try {

if (!connect()) {

throw new IOException("Connection failed.");

}

initConnect();

List<BinlogInfo> binaryLogs = getBinaryLogs();

Cat.logEvent("BinlogFindByTime.BinaryLogs", currentSrcDbEntity.toString(), Message.SUCCESS, Joiner.on(",").join(binaryLogs));

BinlogInfo closestBinlogInfo = null;

for (int k = binaryLogs.size() - 1; k >= 0; k--) {

if (binlogResult != null) {

break;

}

BinlogInfo newBinlogInfo = binaryLogs.get(k);

Cat.logEvent("BinlogFindByTime.Start", newBinlogInfo.toString());

getContext().setDBServerId(currentSrcDbEntity.getServerId());

getContext().setBinlogFileName(newBinlogInfo.getBinlogFile());

getContext().setBinlogStartPos(4);

getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

if (!connect()) {

throw new IOException("Connection failed.");

}

initConnect();

if (dumpBinlog()) {

while (!isStop()) {

BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is,

PacketType.BINLOG_PACKET,

getContext());

if (!binlogPacket.isOk()) {

LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");

throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");

} else {

BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());

try {

getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());

if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {

if (closestBinlogInfo == null) {

break;

} else {

continue;

}

}

if (binlogEvent.getHeader().getTimestamp() >= time) {

if (closestBinlogInfo != null) {

binlogResult = closestBinlogInfo;

}

break;

}

if (binlogEvent.getHeader().getEventType() == BinlogConstants.XID_EVENT

&& binlogEvent.getHeader().getTimestamp() < time) {

closestBinlogInfo = new BinlogInfo(

currentSrcDbEntity.getServerId(),

getContext().getBinlogFileName(),

binlogEvent.getHeader().getNextPosition(),

0, binlogEvent.getHeader().getTimestamp());

}

} finally {

if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {

RotateEvent rotateEvent = (RotateEvent) binlogEvent;

getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());

getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());

} else {

getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());

}

}

}

}

} else {

throw new IOException("Binlog dump failed.");

}

}

Cat.logEvent("BinlogFindByTime.Success", taskName, Message.SUCCESS,

time + " -> " + (binlogResult == null ? "null" : binlogResult.toString()));

t.setStatus(Message.SUCCESS);

t.complete();

return binlogResult;

} catch (IOException e) {

t.setStatus(e);

t.complete();

throw e;

}

}

  • getBinlogByTimestamp方法先执行connect、initConnect,然后通过getBinaryLogs获取binaryLogs,之后遍历binaryLogs执行dumpBinlog,获取binlogEvent.getHeader().getTimestamp()大于等于指定time的BinlogInfo

connect

    private boolean connect() {

try {

closeTransport();

this.mysqlSocket = new Socket();

this.mysqlSocket.setTcpNoDelay(false);

this.mysqlSocket.setKeepAlive(true);

this.mysqlSocket.connect(new InetSocketAddress(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()));

this.is = new BufferedInputStream(mysqlSocket.getInputStream());

this.os = new BufferedOutputStream(mysqlSocket.getOutputStream());

PacketFactory.parsePacket(is, PacketType.CONNECT_PACKET, getContext());

LOG.info("TaskName: " + getTaskName() + ", Connection db success.");

return true;

} catch (Exception e) {

LOG.error("TaskName: " + getTaskName() + ", Connect failed. Reason: " + e.getMessage());

return false;

}

}

  • connect方法先执行closeTransport,然后创建mysqlSocket进行connect

initConnect

    protected void initConnect() throws IOException {

if (!auth()) {

throw new IOException("Login failed.");

}

if (getContext().isCheckSum()) {

if (!updateSetting()) {

throw new IOException("Update setting command failed.");

}

}

if (!queryBinlogFormat()) {

throw new IOException("Query config binlogformat failed.");

}

if (!queryBinlogImage()) {

throw new IOException("Query config binlog row image failed.");

}

if (queryServerId() != currentSrcDbEntity.getServerId()) {

throw new IOException("Server Id Changed.");

}

}

private boolean auth() {

try {

LOG.info("server logining taskName: " + getTaskName() + " host: " + currentSrcDbEntity.getHost() + " port: " + currentSrcDbEntity.getPort() + " username: "

+ currentSrcDbEntity.getUsername() + " dbServerId: " + currentSrcDbEntity.getServerId());

AuthenticatePacket authPacket = (AuthenticatePacket) PacketFactory.createCommandPacket(

PacketType.AUTHENTICATE_PACKET, getContext());

authPacket.setPassword(currentSrcDbEntity.getPassword());

authPacket.setUser(currentSrcDbEntity.getUsername());

authPacket.buildPacket(getContext());

authPacket.write(os, getContext());

OKErrorPacket okErrorPacket = (OKErrorPacket) PacketFactory.parsePacket(is, PacketType.OKERROR_PACKET,

getContext());

boolean isAuth;

if (okErrorPacket.isOk()) {

LOG.info("TaskName: " + getTaskName() + ", Server login success.");

isAuth = true;

} else {

isAuth = false;

LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + okErrorPacket.getMessage());

}

return isAuth;

} catch (Exception e) {

LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + e.getMessage());

return false;

}

}

private boolean queryBinlogFormat() throws IOException {

try {

QueryExecutor executor = new QueryExecutor(is, os);

String cmd = "show global variables like "binlog_format"";

ResultSet rs = executor.query(cmd, getContext());

List<String> columnValues = rs.getFiledValues();

boolean isQuery = true;

if (columnValues == null || columnValues.size() != 2 || columnValues.get(1) == null) {

LOG.error("TaskName: " + getTaskName()

+ ", QueryConfig failed Reason:unexcepted binlog format query result.");

isQuery = false;

}

BinlogFormat binlogFormat = BinlogFormat.valuesOf(columnValues.get(1));

String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

if (binlogFormat == null || !binlogFormat.isRow()) {

isQuery = false;

LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog format: " + binlogFormat.value);

}

Cat.logEvent("Slave.dbBinlogFormat", eventName, isQuery ? Message.SUCCESS : "1", "");

if (isQuery) {

LOG.info("TaskName: " + getTaskName() + ", Query config binlogformat is legal.");

}

return isQuery;

} catch (Exception e) {

LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());

return false;

}

}

private boolean queryBinlogImage() throws IOException {

try {

QueryExecutor executor = new QueryExecutor(is, os);

String cmd = "show variables like "binlog_row_image"";

ResultSet rs = executor.query(cmd, getContext());

List<String> columnValues = rs.getFiledValues();

boolean isQuery = true;

if (columnValues == null || columnValues.size() == 0) {// 5.1

isQuery = true;

} else if (columnValues != null && columnValues.size() == 2 && columnValues.get(1) != null) {// 5.6

BinlogRowImage binlogRowImage = BinlogRowImage.valuesOf(columnValues.get(1));

isQuery = true;

if (binlogRowImage == null || !binlogRowImage.isFull()) {

isQuery = false;

LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog row image: " + binlogRowImage.value);

}

} else {

LOG.error("TaskName: " + getTaskName()

+ ", QueryConfig failed Reason:unexcepted binlog row image query result.");

isQuery = false;

}

String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

Cat.logEvent("Slave.dbBinlogRowImage", eventName, isQuery ? Message.SUCCESS : "1", "");

if (isQuery) {

LOG.info("TaskName: " + getTaskName() + ", Query config binlog row image is legal.");

}

return isQuery;

} catch (Exception e) {

LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());

return false;

}

}

  • initConnect方法依次执行auth、queryBinlogFormat、queryBinlogImage方法;auth方法进行账号密码校验;queryBinlogFormat主要执行show global variables like "binlog_format"命令;queryBinlogImage主要执行show variables like "binlog_row_image"

initBinlogPosition

    protected void initBinlogPosition(BinlogInfo binlogInfo) throws IOException {

if (binlogInfo == null) {

List<BinlogInfo> binaryLogs = getBinaryLogs();

BinlogInfo begin = beginTime == null ? binaryLogs.get(binaryLogs.size() - 1) : binaryLogs.get(0);

binlogInfo = new BinlogInfo(currentSrcDbEntity.getServerId(), begin.getBinlogFile(), 4l, 0, begin.getTimestamp());

}

getContext().setDBServerId(currentSrcDbEntity.getServerId());

getContext().setBinlogFileName(binlogInfo.getBinlogFile());

getContext().setBinlogStartPos(binlogInfo.getBinlogPosition());

setBinlogInfo(binlogInfo);

SystemStatusManager.addServer(getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort(), tableSet);

SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);

}

  • initBinlogPosition主要是将binlogInfo信息设置到PumaContext中

dumpBinlog

    private boolean dumpBinlog() {

try {

ComBinlogDumpPacket dumpBinlogPacket = (ComBinlogDumpPacket) PacketFactory.createCommandPacket(

PacketType.COM_BINLOG_DUMP_PACKET, getContext());

dumpBinlogPacket.setBinlogFileName(getContext().getBinlogFileName());

dumpBinlogPacket.setBinlogFlag(0);

dumpBinlogPacket.setBinlogPosition(getContext().getBinlogStartPos());

dumpBinlogPacket.setServerId(getServerId());

dumpBinlogPacket.buildPacket(getContext());

dumpBinlogPacket.write(os, getContext());

OKErrorPacket dumpCommandResultPacket = (OKErrorPacket) PacketFactory.parsePacket(is,

PacketType.OKERROR_PACKET, getContext());

if (dumpCommandResultPacket.isOk()) {

LOG.info("TaskName: " + getTaskName() + ", Dump binlog command success.");

return true;

} else {

LOG.error("TaskName: " + getTaskName() + ", Dump binlog failed. Reason: "

+ dumpCommandResultPacket.getMessage());

return false;

}

} catch (Exception e) {

LOG.error("TaskName: " + getTaskName() + " Dump binlog failed. Reason: " + e.getMessage());

return false;

}

}

  • dumpBinlog方法主要是发送COM_BINLOG_DUMP_PACKET

processBinlog

    private void processBinlog() throws IOException {

while (!isStop()) {

BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is, PacketType.BINLOG_PACKET,

getContext());

if (!binlogPacket.isOk()) {

LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");

throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");

} else {

processBinlogPacket(binlogPacket);

}

}

}

protected void processBinlogPacket(BinlogPacket binlogPacket) throws IOException {

BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());

if (merging) {

if (binlogEvent.getHeader().getTimestamp() >= runUntilTimestamp) {

stop();

}

}

SystemStatusManager.incServerParsedCounter(getTaskName());

if (binlogEvent.getHeader().getEventType() == BinlogConstants.INTVAR_EVENT

|| binlogEvent.getHeader().getEventType() == BinlogConstants.RAND_EVENT

|| binlogEvent.getHeader().getEventType() == BinlogConstants.USER_VAR_EVENT) {

LOG.error("TaskName: " + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support.");

String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

Cat.logEvent("Slave.dbBinlogFormat", eventName, "1", "");

Cat.logError("Puma.server.mixedorstatement.format", new IllegalArgumentException("TaskName: "

+ getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support."));

stopTask();

}

if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {

getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());

}

if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {

processRotateEvent(binlogEvent);

} else {

processDataEvent(binlogEvent);

}

}

protected void processRotateEvent(BinlogEvent binlogEvent) {

RotateEvent rotateEvent = (RotateEvent) binlogEvent;

getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());

getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());

}

protected void processDataEvent(BinlogEvent binlogEvent) {

DataHandlerResult dataHandlerResult = null;

// 一直处理一个binlogEvent的多行,处理完每行马上分发,以防止一个binlogEvent包含太多ChangedEvent而耗费太多内存

int eventIndex = 0;

do {

dataHandlerResult = dataHandler.process(binlogEvent, getContext());

if (dataHandlerResult != null && !dataHandlerResult.isEmpty()) {

ChangedEvent changedEvent = dataHandlerResult.getData();

changedEvent.getBinlogInfo().setEventIndex(eventIndex++);

updateOpsCounter(changedEvent);

dispatch(changedEvent);

}

} while (dataHandlerResult != null && !dataHandlerResult.isFinished());

if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {

getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());

setBinlogInfo(new BinlogInfo(getBinlogInfo().getServerId(), getBinlogInfo().getBinlogFile(), binlogEvent

.getHeader().getNextPosition(), 0, 0));

}

BinlogInfo binlogInfo = new BinlogInfo(getContext().getDBServerId(), getContext()

.getBinlogFileName(), binlogEvent.getHeader().getNextPosition(), 0, binlogEvent.getHeader().getTimestamp());

SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);

if (binlogEvent.getHeader().getNextPosition() != 0

&& StringUtils.isNotBlank(getContext().getBinlogFileName())

&& dataHandlerResult != null

&& !dataHandlerResult.isEmpty()

&& (dataHandlerResult.getData() instanceof DdlEvent || (dataHandlerResult.getData() instanceof RowChangedEvent && ((RowChangedEvent) dataHandlerResult

.getData()).isTransactionCommit()))) {

instanceStorageManager.setBinlogInfo(getTaskName(), binlogInfo);

}

}

- processBinlog方法循环接收binlogPacket,然后执行processBinlogPacket;该方法通过parser.parse获取binlogEvent,对于FORMAT_DESCRIPTION_EVENT,则更新binlogEvent.getHeader().getNextPosition()到context中;对于ROTATE_EVENT则执行processRotateEvent,否则执行processDataEvent;processRotateEvent主要是更新binlogFileName及binlogStartPos;processDataEvent则主要是通过dataHandler.process(binlogEvent, getContext())处理,然后执行dispatch(changedEvent)

closeTransport

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

    private void closeTransport() {

// Close in.

try {

if (this.is != null) {

this.is.close();

}

} catch (IOException ioEx) {

LOG.warn("Server " + this.getTaskName() + ", Failed to close the input stream.");

} finally {

this.is = null;

}

// Close os.

try {

if (this.os != null) {

this.os.close();

}

} catch (IOException ioEx) {

LOG.warn("Server " + this.getTaskName() + ", Failed to close the output stream");

} finally {

this.os = null;

}

// Close socket.

try {

if (this.mysqlSocket != null) {

this.mysqlSocket.close();

}

} catch (IOException ioEx) {

LOG.warn("Server " + this.getTaskName() + ", Failed to close the socket", ioEx);

} finally {

this.mysqlSocket = null;

}

}

  • closeTransport主要是关闭InputStream、OutputStream及mysqlSocket

小结

DefaultTaskExecutor继承了AbstractTaskExecutor,其doStart方法通过instanceStorageManager.getBinlogInfo获取binlogInfo,若为null且beginTime不为null则从getBinlogByTimestamp获取binlogInfo,之后执行updateTableMetaInfoFetcher、connect、initConnect、initBinlogPosition、dumpBinlog、processBinlog方法;其doStop方法主要执行closeTransport、SystemStatusManager.deleteServer(getTaskName())方法

doc

  • DefaultTaskExecutor

以上是 聊聊puma的DefaultTaskExecutor 的全部内容, 来源链接: utcz.com/z/517099.html

回到顶部