public abstract class LogFetcher extends LogBuffer implements Closeable { /** Default initial capacity. */
public static final int DEFAULT_INITIAL_CAPACITY = 8192;
/** Default growth factor. */
public static final float DEFAULT_GROWTH_FACTOR = 2.0f;
/** Binlog file header size */
public static final int BIN_LOG_HEADER_SIZE = 4;
protected final float factor;
public LogFetcher(){
public LogFetcher(final int initialCapacity){
this(initialCapacity, DEFAULT_GROWTH_FACTOR);
public LogFetcher(final int initialCapacity, final float growthFactor){
this.buffer = new byte[initialCapacity];
this.factor = growthFactor;
* Increases the capacity of this <tt>LogFetcher</tt> instance, if
* necessary, to ensure that it can hold at least the number of elements
* specified by the minimum capacity argument.
* @param minCapacity the desired minimum capacity
protected final void ensureCapacity(final int minCapacity) {
final int oldCapacity = buffer.length;
if (minCapacity > oldCapacity) {
int newCapacity = (int) (oldCapacity * factor);
if (newCapacity < minCapacity) newCapacity = minCapacity;
buffer = Arrays.copyOf(buffer, newCapacity);
* Fetches the next frame of binary-log, and fill it in buffer.
public abstract boolean fetch() throws IOException;
* {@inheritDoc}
* @see java.io.Closeable#close()
public abstract void close() throws IOException;
- LogFetcher继承了LogBuffer,它定义了fetch、close两个抽象方法
public final class FileLogFetcher extends LogFetcher { public static final byte[] BINLOG_MAGIC = { -2, 0x62, 0x69, 0x6e };
private FileInputStream fin;
public FileLogFetcher(){
public FileLogFetcher(final int initialCapacity){
super(initialCapacity, DEFAULT_GROWTH_FACTOR);
public FileLogFetcher(final int initialCapacity, final float growthFactor){
super(initialCapacity, growthFactor);
* Open binlog file in local disk to fetch.
public void open(File file) throws FileNotFoundException, IOException {
open(file, 0L);
* Open binlog file in local disk to fetch.
public void open(String filePath) throws FileNotFoundException, IOException {
open(new File(filePath), 0L);
* Open binlog file in local disk to fetch.
public void open(String filePath, final long filePosition) throws FileNotFoundException, IOException {
open(new File(filePath), filePosition);
* Open binlog file in local disk to fetch.
public void open(File file, final long filePosition) throws FileNotFoundException, IOException {
fin = new FileInputStream(file);
if (BIN_LOG_HEADER_SIZE != fin.read(buffer, 0, BIN_LOG_HEADER_SIZE)) {
throw new IOException("No binlog file header");
if (buffer[0] != BINLOG_MAGIC[0] || buffer[1] != BINLOG_MAGIC[1] || buffer[2] != BINLOG_MAGIC[2]
|| buffer[3] != BINLOG_MAGIC[3]) {
throw new IOException("Error binlog file header: "
+ Arrays.toString(Arrays.copyOf(buffer, BIN_LOG_HEADER_SIZE)));
limit = 0;
origin = 0;
position = 0;
if (filePosition > BIN_LOG_HEADER_SIZE) {
final int maxFormatDescriptionEventLen = FormatDescriptionLogEvent.LOG_EVENT_MINIMAL_HEADER_LEN
+ FormatDescriptionLogEvent.ST_COMMON_HEADER_LEN_OFFSET
limit = fin.read(buffer, 0, maxFormatDescriptionEventLen);
limit = (int) getUint32(LogEvent.EVENT_LEN_OFFSET);
* {@inheritDoc}
* @see com.taobao.tddl.dbsync.binlog.LogFetcher#fetch()
public boolean fetch() throws IOException {
if (limit == 0) {
final int len = fin.read(buffer, 0, buffer.length);
if (len >= 0) {
limit += len;
position = 0;
origin = 0;
/* More binlog to fetch */
return true;
} else if (origin == 0) {
if (limit > buffer.length / 2) {
ensureCapacity(buffer.length + limit);
final int len = fin.read(buffer, limit, buffer.length - limit);
if (len >= 0) {
limit += len;
/* More binlog to fetch */
return true;
} else if (limit > 0) {
if (limit >= FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) {
int lenPosition = position + 4 + 1 + 4;
long eventLen = ((long) (0xff & buffer[lenPosition++])) | ((long) (0xff & buffer[lenPosition++]) << 8)
| ((long) (0xff & buffer[lenPosition++]) << 16)
| ((long) (0xff & buffer[lenPosition++]) << 24);
if (limit >= eventLen) {
return true;
} else {
ensureCapacity((int) eventLen);
System.arraycopy(buffer, origin, buffer, 0, limit);
position -= origin;
origin = 0;
final int len = fin.read(buffer, limit, buffer.length - limit);
if (len >= 0) {
limit += len;
/* More binlog to fetch */
return true;
} else {
/* Should not happen. */
throw new IllegalArgumentException("Unexcepted limit: " + limit);
/* Reach binlog file end */
return false;
* {@inheritDoc}
* @see com.taobao.tddl.dbsync.binlog.LogFetcher#close()
public void close() throws IOException {
if (fin != null) {
fin = null;
- FileLogFetcher继承了LogFetcher,它提供了open方法来打开本地的binlog文件;其fetch方法从FileInputStream读取buffer大小的数据,如果已经读到binglog的末尾则返回false,否则返回true;其close方法关闭FileInputStream
- LogFetcher
以上是 聊聊canal的LogFetcher 的全部内容, 来源链接: utcz.com/z/515738.html