聊聊SimpleCanalConnector的getWithoutAck

编程

getWithoutAck

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {

private static final Logger logger = LoggerFactory.getLogger(SimpleCanalConnector.class);

private SocketAddress address;

private String username;

private String password;

private int soTimeout = 60000; // milliseconds

private int idleTimeout = 60 * 60 * 1000; // client和server之间的空闲链接超时的时间,默认为1小时

private String filter; // 记录上一次的filter提交值,便于自动重试时提交

private final ByteBuffer readHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);

private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);

private SocketChannel channel;

private ReadableByteChannel readableChannel;

private WritableByteChannel writableChannel;

private List<Compression> supportedCompressions = new ArrayList<Compression>();

private ClientIdentity clientIdentity;

private ClientRunningMonitor runningMonitor; // 运行控制

private ZkClientx zkClientx;

private BooleanMutex mutex = new BooleanMutex(false);

private volatile boolean connected = false; // 代表connected是否已正常执行,因为有HA,不代表在工作中

private boolean rollbackOnConnect = true; // 是否在connect链接成功后,自动执行rollback操作

private boolean rollbackOnDisConnect = false; // 是否在connect链接成功后,自动执行rollback操作

private boolean lazyParseEntry = false; // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析

// 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败

private Object readDataLock = new Object();

private Object writeDataLock = new Object();

private volatile boolean running = false;

//......

public Message getWithoutAck(int batchSize) throws CanalClientException {

return getWithoutAck(batchSize, null, null);

}

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {

waitClientRunning();

if (!running) {

return null;

}

try {

int size = (batchSize <= 0) ? 1000 : batchSize;

long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制

if (unit == null) {

unit = TimeUnit.MILLISECONDS;

}

writeWithHeader(Packet.newBuilder()

.setType(PacketType.GET)

.setBody(Get.newBuilder()

.setAutoAck(false)

.setDestination(clientIdentity.getDestination())

.setClientId(String.valueOf(clientIdentity.getClientId()))

.setFetchSize(size)

.setTimeout(time)

.setUnit(unit.ordinal())

.build()

.toByteString())

.build()

.toByteArray());

return receiveMessages();

} catch (IOException e) {

throw new CanalClientException(e);

}

}

//......

}

  • getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit

writeWithHeader

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {

//......

private void writeWithHeader(byte[] body) throws IOException {

writeWithHeader(writableChannel, body);

}

private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException {

synchronized (writeDataLock) {

writeHeader.clear();

writeHeader.putInt(body.length);

writeHeader.flip();

channel.write(writeHeader);

channel.write(ByteBuffer.wrap(body));

}

}

//......

}

  • writeWithHeader在header写入body的长度,然后写入header,再写入body

receiveMessages

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {

//......

private Message receiveMessages() throws IOException {

byte[] data = readNextPacket();

return CanalMessageDeserializer.deserializer(data, lazyParseEntry);

}

private byte[] readNextPacket() throws IOException {

return readNextPacket(readableChannel);

}

private byte[] readNextPacket(ReadableByteChannel channel) throws IOException {

synchronized (readDataLock) {

readHeader.clear();

read(channel, readHeader);

int bodyLen = readHeader.getInt(0);

ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);

read(channel, bodyBuf);

return bodyBuf.array();

}

}

//......

}

  • receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body

小结

getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit;receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body

doc

  • SimpleCanalConnector

以上是 聊聊SimpleCanalConnector的getWithoutAck 的全部内容, 来源链接: utcz.com/z/515221.html

回到顶部