聊聊rocketmq的RemotingSendRequestException

编程

本文主要研究一下rocketmq的RemotingSendRequestException

RemotingSendRequestException

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java

public class RemotingSendRequestException extends RemotingException {

private static final long serialVersionUID = 5391285827332471674L;

public RemotingSendRequestException(String addr) {

this(addr, null);

}

public RemotingSendRequestException(String addr, Throwable cause) {

super("send request to <" + addr + "> failed", cause);

}

}

  • RemotingSendRequestException继承了RemotingException,它的构造器要求addr参数

invokeSyncImpl

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

public abstract class NettyRemotingAbstract {

//......

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,

final long timeoutMillis)

throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {

final int opaque = request.getOpaque();

try {

final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);

this.responseTable.put(opaque, responseFuture);

final SocketAddress addr = channel.remoteAddress();

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture f) throws Exception {

if (f.isSuccess()) {

responseFuture.setSendRequestOK(true);

return;

} else {

responseFuture.setSendRequestOK(false);

}

responseTable.remove(opaque);

responseFuture.setCause(f.cause());

responseFuture.putResponse(null);

log.warn("send a request command to channel <" + addr + "> failed.");

}

});

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

if (null == responseCommand) {

if (responseFuture.isSendRequestOK()) {

throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,

responseFuture.getCause());

} else {

throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());

}

}

return responseCommand;

} finally {

this.responseTable.remove(opaque);

}

}

//......

}

  • invokeSyncImpl方法在responseCommand为null且responseFuture.isSendRequestOK()为false的时候抛出RemotingSendRequestException

invokeAsyncImpl

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

public abstract class NettyRemotingAbstract {

//......

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,

final InvokeCallback invokeCallback)

throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

long beginStartTime = System.currentTimeMillis();

final int opaque = request.getOpaque();

boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

if (acquired) {

final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

long costTime = System.currentTimeMillis() - beginStartTime;

if (timeoutMillis < costTime) {

once.release();

throw new RemotingTimeoutException("invokeAsyncImpl call timeout");

}

final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);

this.responseTable.put(opaque, responseFuture);

try {

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture f) throws Exception {

if (f.isSuccess()) {

responseFuture.setSendRequestOK(true);

return;

}

requestFail(opaque);

log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));

}

});

} catch (Exception e) {

responseFuture.release();

log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);

throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);

}

} else {

if (timeoutMillis <= 0) {

throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");

} else {

String info =

String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",

timeoutMillis,

this.semaphoreAsync.getQueueLength(),

this.semaphoreAsync.availablePermits()

);

log.warn(info);

throw new RemotingTimeoutException(info);

}

}

}

//......

}

  • invokeAsyncImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException

invokeOnewayImpl

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

public abstract class NettyRemotingAbstract {

//......

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)

throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

request.markOnewayRPC();

boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

if (acquired) {

final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);

try {

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture f) throws Exception {

once.release();

if (!f.isSuccess()) {

log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");

}

}

});

} catch (Exception e) {

once.release();

log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");

throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);

}

} else {

if (timeoutMillis <= 0) {

throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");

} else {

String info = String.format(

"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",

timeoutMillis,

this.semaphoreOneway.getQueueLength(),

this.semaphoreOneway.availablePermits()

);

log.warn(info);

throw new RemotingTimeoutException(info);

}

}

}

//......

}

  • invokeOnewayImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException

小结

RemotingSendRequestException继承了RemotingException,它的构造器要求addr参数;invokeSyncImpl方法在responseCommand为null且responseFuture.isSendRequestOK()为false的时候抛出RemotingSendRequestException;invokeAsyncImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException;invokeOnewayImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException

doc

  • NettyRemotingAbstract

以上是 聊聊rocketmq的RemotingSendRequestException 的全部内容, 来源链接: utcz.com/z/511724.html

回到顶部