聊聊rocketmq的AclClientRPCHook

编程

本文主要研究一下rocketmq的AclClientRPCHook

RPCHook

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/RPCHook.java

public interface RPCHook {

void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

void doAfterResponse(final String remoteAddr, final RemotingCommand request,

final RemotingCommand response);

}

  • RPCHook定义了doBeforeRequest、doAfterResponse方法

AclClientRPCHook

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclClientRPCHook.java

public class AclClientRPCHook implements RPCHook {

private final SessionCredentials sessionCredentials;

protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =

new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();

public AclClientRPCHook(SessionCredentials sessionCredentials) {

this.sessionCredentials = sessionCredentials;

}

@Override

public void doBeforeRequest(String remoteAddr, RemotingCommand request) {

byte[] total = AclUtils.combineRequestContent(request,

parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));

String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());

request.addExtField(SIGNATURE, signature);

request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());

// The SecurityToken value is unneccessary,user can choose this one.

if (sessionCredentials.getSecurityToken() != null) {

request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());

}

}

@Override

public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {

}

protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {

CommandCustomHeader header = request.readCustomHeader();

// Sort property

SortedMap<String, String> map = new TreeMap<String, String>();

map.put(ACCESS_KEY, ak);

if (securityToken != null) {

map.put(SECURITY_TOKEN, securityToken);

}

try {

// Add header properties

if (null != header) {

Field[] fields = fieldCache.get(header.getClass());

if (null == fields) {

fields = header.getClass().getDeclaredFields();

for (Field field : fields) {

field.setAccessible(true);

}

Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);

if (null != tmp) {

fields = tmp;

}

}

for (Field field : fields) {

Object value = field.get(header);

if (null != value && !field.isSynthetic()) {

map.put(field.getName(), value.toString());

}

}

}

return map;

} catch (Exception e) {

throw new RuntimeException("incompatible exception.", e);

}

}

public SessionCredentials getSessionCredentials() {

return sessionCredentials;

}

}

  • AclClientRPCHook实现了RPCHook接口,其构造器接收SessionCredentials参数;其doBeforeRequest首先通过parseRequestContent从request读取CommandCustomHeader,将其field连同accessKey、securityToken放到一个SortedMap,再通过AclUtils.combineRequestContent计算要发送的请求内容;然后通过AclUtils.calSignature计算出signature,最后往request的extFields添加SIGNATURE、ACCESS_KEY;若设置securityToken,则会往request的extFields添加SECURITY_TOKEN

SessionCredentials

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/SessionCredentials.java

public class SessionCredentials {

public static final Charset CHARSET = Charset.forName("UTF-8");

public static final String ACCESS_KEY = "AccessKey";

public static final String SECRET_KEY = "SecretKey";

public static final String SIGNATURE = "Signature";

public static final String SECURITY_TOKEN = "SecurityToken";

public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",

System.getProperty("user.home") + File.separator + "key");

private String accessKey;

private String secretKey;

private String securityToken;

private String signature;

public SessionCredentials() {

String keyContent = null;

try {

keyContent = MixAll.file2String(KEY_FILE);

} catch (IOException ignore) {

}

if (keyContent != null) {

Properties prop = MixAll.string2Properties(keyContent);

if (prop != null) {

this.updateContent(prop);

}

}

}

public SessionCredentials(String accessKey, String secretKey) {

this.accessKey = accessKey;

this.secretKey = secretKey;

}

public SessionCredentials(String accessKey, String secretKey, String securityToken) {

this(accessKey, secretKey);

this.securityToken = securityToken;

}

public void updateContent(Properties prop) {

{

String value = prop.getProperty(ACCESS_KEY);

if (value != null) {

this.accessKey = value.trim();

}

}

{

String value = prop.getProperty(SECRET_KEY);

if (value != null) {

this.secretKey = value.trim();

}

}

{

String value = prop.getProperty(SECURITY_TOKEN);

if (value != null) {

this.securityToken = value.trim();

}

}

}

//......

}

  • SessionCredentials提供了三个构造器,一个无参构造器从KEY_FILE加载keyContent然后解析为Properties再通过updateContent方法给accessKey、secretKey、securityToken赋值;一个是accessKey, secretKey的构造器;还有一个是accessKey, secretKey, securityToken的构造器

AclUtils

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclUtils.java

public class AclUtils {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {

try {

StringBuilder sb = new StringBuilder("");

for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {

if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {

sb.append(entry.getValue());

}

}

return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());

} catch (Exception e) {

throw new RuntimeException("Incompatible exception.", e);

}

}

public static byte[] combineBytes(byte[] b1, byte[] b2) {

int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);

byte[] total = new byte[size];

if (null != b1)

System.arraycopy(b1, 0, total, 0, b1.length);

if (null != b2)

System.arraycopy(b2, 0, total, b1.length, b2.length);

return total;

}

public static String calSignature(byte[] data, String secretKey) {

String signature = AclSigner.calSignature(data, secretKey);

return signature;

}

//......

}

  • combineRequestContent首先将fieldsMap拼接为字符串,然后通过AclUtils.combineBytes将其与request.getBody()结合在一起;calSignature方法内部是委托给AclSigner.calSignature(data, secretKey)来实现

AclSigner

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclSigner.java

public class AclSigner {

public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);

private static final int CAL_SIGNATURE_FAILED = 10015;

private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";

public static String calSignature(String data, String key) throws AclException {

return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);

}

public static String calSignature(String data, String key, SigningAlgorithm algorithm,

Charset charset) throws AclException {

return signAndBase64Encode(data, key, algorithm, charset);

}

private static String signAndBase64Encode(String data, String key, SigningAlgorithm algorithm, Charset charset)

throws AclException {

try {

byte[] signature = sign(data.getBytes(charset), key.getBytes(charset), algorithm);

return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);

} catch (Exception e) {

String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());

log.error(message, e);

throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);

}

}

private static byte[] sign(byte[] data, byte[] key, SigningAlgorithm algorithm) throws AclException {

try {

Mac mac = Mac.getInstance(algorithm.toString());

mac.init(new SecretKeySpec(key, algorithm.toString()));

return mac.doFinal(data);

} catch (Exception e) {

String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());

log.error(message, e);

throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);

}

}

public static String calSignature(byte[] data, String key) throws AclException {

return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);

}

public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm,

Charset charset) throws AclException {

return signAndBase64Encode(data, key, algorithm, charset);

}

private static String signAndBase64Encode(byte[] data, String key, SigningAlgorithm algorithm, Charset charset)

throws AclException {

try {

byte[] signature = sign(data, key.getBytes(charset), algorithm);

return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);

} catch (Exception e) {

String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());

log.error(message, e);

throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);

}

}

}

  • calSignature默认使用的是SigningAlgorithm.HmacSHA1及Charset.forName("UTF-8")字符集来签名;signAndBase64Encode方法首先通过sign方法签名,然后将其转为Base64的字符串

小结

AclClientRPCHook实现了RPCHook接口,其构造器接收SessionCredentials参数;其doBeforeRequest首先通过parseRequestContent从request读取CommandCustomHeader,将其field连同accessKey、securityToken放到一个SortedMap,再通过AclUtils.combineRequestContent计算要发送的请求内容;然后通过AclUtils.calSignature计算出signature,最后往request的extFields添加SIGNATURE、ACCESS_KEY;若设置securityToken,则会往request的extFields添加SECURITY_TOKEN

doc

  • AclClientRPCHook

以上是 聊聊rocketmq的AclClientRPCHook 的全部内容, 来源链接: utcz.com/z/510563.html

回到顶部