聊聊puma的Sender

编程

本文主要研究一下puma的Sender

Sender

puma/puma/src/main/java/com/dianping/puma/sender/Sender.java

public interface Sender extends LifeCycle {

String getName();

void send(ChangedEvent event, PumaContext context) throws SenderException;

}

  • Sender定义了getName、send方法

AbstractSender

puma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java

public abstract class AbstractSender implements Sender {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class);

private String name;

private int maxTryTimes = 3;

private boolean canMissEvent = false;

private volatile boolean stopped = true;

private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";

private final String MSG_LOOP_FAILED = "[Can"t Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";

/**

* @return the stop

*/

public boolean isStop() {

return stopped;

}

/**

* @return the maxTryTimes

*/

public int getMaxTryTimes() {

return maxTryTimes;

}

/**

* @param maxTryTimes the maxTryTimes to set

*/

public void setMaxTryTimes(int maxTryTimes) {

this.maxTryTimes = maxTryTimes;

}

/**

* @return the canMissEvent

*/

public boolean isCanMissEvent() {

return canMissEvent;

}

/**

* @param canMissEvent the canMissEvent to set

*/

public void setCanMissEvent(boolean canMissEvent) {

this.canMissEvent = canMissEvent;

}

/*

* (non-Javadoc)

*

* @see com.dianping.puma.common.LifeCycle#start()

*/

@Override

public void start() {

stopped = false;

}

/*

* (non-Javadoc)

*

* @see com.dianping.puma.common.LifeCycle#stop()

*/

@Override

public void stop() {

stopped = true;

}

/*

* (non-Javadoc)

*

* @see com.dianping.puma.sender.Sender#getName()

*/

@Override

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

@Override

public void send(ChangedEvent event, PumaContext context) throws SenderException {

long retryCount = 0;

while (true) {

if (isStop()) {

break;

}

try {

doSend(event, context);

break;

} catch (Exception e) {

LOG.error("Send error!", e);

if (retryCount++ > maxTryTimes) {

if (canMissEvent) {

LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(),

context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));

return;

} else {

if (retryCount % 100 == 0) {

LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(),

context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));

}

}

}

try {

Thread.sleep(((retryCount % 15) + 1) * 300);

} catch (InterruptedException e1) {

Thread.currentThread().interrupt();

throw new SenderException("Interrupted", e1);

}

}

}

}

protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;

}

  • AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试

FileDumpSender

puma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java

public class FileDumpSender extends AbstractSender {

private Map<String, WriteChannel> writeChannels = new ConcurrentHashMap<String, WriteChannel>();

private ChangedEvent transactionBegin;

private EventFilterChain storageEventFilterChain;

@Override

public void start() {

super.start();

}

@Override

public void stop() {

for (WriteChannel channel : writeChannels.values()) {

channel.stop();

}

super.stop();

}

@Override

protected void doSend(ChangedEvent event, PumaContext context) throws SenderException {

// Storage filter.

storageEventFilterChain.reset();

if (!storageEventFilterChain.doNext(event)) {

return;

}

try {

String database = event.getDatabase();

if (database != null && database.length() > 0) {

WriteChannel writeChannel = this.writeChannels.get(database);

if (writeChannel == null) {

writeChannel = buildEventStorage(database);

this.writeChannels.put(database, writeChannel);

}

boolean isTransactionBegin = false;

if (event instanceof RowChangedEvent) {

isTransactionBegin = ((RowChangedEvent) event).isTransactionBegin();

}

if (transactionBegin != null && !isTransactionBegin) {

//readChannel.store(transactionBegin);

transactionBegin = null;

}

writeChannel.append(event);

} else {

if (event instanceof RowChangedEvent) {

if (((RowChangedEvent) event).isTransactionBegin()) {

transactionBegin = event;

} else {

Cat.logEvent("Puma", "RowChangeEvent-Has-No-Database");

LOG.error(String.format("RowChangeEvent[%s] has no database", event.toString()));

}

} else {

Cat.logEvent("Puma", "ChangeEvent-Has-No-Database");

LOG.error(String.format("ChangeEvent[%s] has no database", event.toString()));

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

private WriteChannel buildEventStorage(String database) {

WriteChannel writeChannel = ChannelFactory.newWriteChannel(database);

writeChannel.start();

return writeChannel;

}

public void setStorageEventFilterChain(EventFilterChain storageEventFilterChain) {

this.storageEventFilterChain = storageEventFilterChain;

}

}

  • FileDumpSender继承了AbstractSender,它定义了writeChannels属性,其stop方法会遍历writeChannels,挨个执行channel.stop();其doSend方法先执行storageEventFilterChain.doNext(event),返回false的话则直接返回,之后获取或者创建指定database的writeChannel,执行writeChannel.append(event);对于database为null的则判断是否是RowChangedEvent,是的话,在RowChangedEvent的isTransactionBegin时设置transactionBegin

小结

Sender定义了getName、send方法;AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试

doc

  • Sender

以上是 聊聊puma的Sender 的全部内容, 来源链接: utcz.com/z/517198.html

回到顶部