聊聊rocketmq的AccessValidator
序
本文主要研究一下rocketmq的AccessValidator
AccessValidator
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
public interface AccessValidator { /**
* Parse to get the AccessResource(user, resource, needed permission)
*
* @param request
* @param remoteAddr
* @return Plain access resource result,include access key,signature and some other access attributes.
*/
AccessResource parse(RemotingCommand request, String remoteAddr);
/**
* Validate the access resource.
*
* @param accessResource
*/
void validate(AccessResource accessResource);
/**
* Update the access resource config
*
* @param plainAccessConfig
* @return
*/
boolean updateAccessConfig(PlainAccessConfig plainAccessConfig);
/**
* Delete the access resource config
*
* @return
*/
boolean deleteAccessConfig(String accesskey);
/**
* Get the access resource config version information
*
* @return
*/
String getAclConfigVersion();
/**
* Update globalWhiteRemoteAddresses in acl yaml config file
* @return
*/
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);
}
- AccessValidator接口定义了parse、validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法
PlainAccessValidator
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
public class PlainAccessValidator implements AccessValidator { private PlainPermissionManager aclPlugEngine;
public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionManager();
}
@Override
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
accessResource.setRequestCode(request.getCode());
if (request.getExtFields() == null) {
// If request"s extFields is null,then return accessResource directly(users can use whiteAddress pattern)
// The following logic codes depend on the request"s extFields not to be null.
return accessResource;
}
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
try {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
}
}
break;
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
break;
default:
break;
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);
}
// Content
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;
}
@Override
public void validate(AccessResource accessResource) {
aclPlugEngine.validate((PlainAccessResource) accessResource);
}
@Override
public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
return aclPlugEngine.updateAccessConfig(plainAccessConfig);
}
@Override
public boolean deleteAccessConfig(String accesskey) {
return aclPlugEngine.deleteAccessConfig(accesskey);
}
@Override public String getAclConfigVersion() {
return aclPlugEngine.getAclConfigDataVersion();
}
@Override public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
}
}
- PlainAccessValidator实现了AccessValidator接口,其构造器创建了PlainPermissionManager;其parse方法解析remotingCommand及remoteAddr,构造plainAccessResource,具体根据不同的RequestCode来设置resourceAndPerm,之后根据request及fieldsMap设置content;其validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法都委托给了PlainPermissionManager
PlainPermissionManager
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
public class PlainPermissionManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
private boolean isWatchStart;
private final DataVersion dataVersion = new DataVersion();
public PlainPermissionManager() {
load();
watch();
}
//......
public void validate(PlainAccessResource plainAccessResource) {
// Check the global white remote addr
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
if (plainAccessResource.getAccessKey() == null) {
throw new AclException(String.format("No accessKey is configured"));
}
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
}
// Check the white addr for accesskey
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
// Check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
// Check perm of each resource
checkPerm(plainAccessResource, ownedAccess);
}
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
// If the needCheckedPermMap is null,then return
return;
}
if (ownedPermMap == null && ownedAccess.isAdmin()) {
// If the ownedPermMap is null and it is an admin user, then return
return;
}
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
}
//......
}
- validate方法首先根据globalWhiteRemoteAddressStrategy进行校验,通过则立马返回;之后校验accessKey是否为null,为null的抛出AclException,接着判断该accessKey上是否在plainAccessResourceMap里头,不包含则抛出AclException;接着取出该accessKey对应的PlainAccessResource根据remoteAddressStrategy判断是否在白名单内,在则立马返回
- 之后时候配置的secretKey对plainAccessResource.getContent()进行签名,然后对比plainAccessResource.getSignature(),不一样则抛出AclException;最后是通过checkPerm来校验权限
- checkPerm的入参为needCheckedAccess及ownedAccess,它首先判断是否需要是adminPerm,如果是且ownedAccess.isAdmin()为false则抛出AclException;之后是遍历needCheckedPermMap对每个needCheckedEntry从ownedPermMap取出相应数据判断,这里调用了Permission.checkPermission方法
Permission
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java
public class Permission { public static final byte DENY = 1;
public static final byte ANY = 1 << 1;
public static final byte PUB = 1 << 2;
public static final byte SUB = 1 << 3;
public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>();
static {
// UPDATE_AND_CREATE_TOPIC
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC);
// UPDATE_BROKER_CONFIG
ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG);
// DELETE_TOPIC_IN_BROKER
ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP);
// DELETE_SUBSCRIPTIONGROUP
ADMIN_CODE.add(RequestCode.DELETE_SUBSCRIPTIONGROUP);
}
public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
if ((ownedPerm & DENY) > 0) {
return false;
}
if ((neededPerm & ANY) > 0) {
return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);
}
return (neededPerm & ownedPerm) > 0;
}
public static byte parsePermFromString(String permString) {
if (permString == null) {
return Permission.DENY;
}
switch (permString.trim()) {
case "PUB":
return Permission.PUB;
case "SUB":
return Permission.SUB;
case "PUB|SUB":
return Permission.PUB | Permission.SUB;
case "SUB|PUB":
return Permission.PUB | Permission.SUB;
case "DENY":
return Permission.DENY;
default:
return Permission.DENY;
}
}
public static void parseResourcePerms(PlainAccessResource plainAccessResource, Boolean isTopic,
List<String> resources) {
if (resources == null || resources.isEmpty()) {
return;
}
for (String resource : resources) {
String[] items = StringUtils.split(resource, "=");
if (items.length == 2) {
plainAccessResource.addResourceAndPerm(isTopic ? items[0].trim() : PlainAccessResource.getRetryTopic(items[0].trim()), parsePermFromString(items[1].trim()));
} else {
throw new AclException(String.format("Parse resource permission failed for %s:%s", isTopic ? "topic" : "group", resource));
}
}
}
public static boolean needAdminPerm(Integer code) {
return ADMIN_CODE.contains(code);
}
}
- Permission定义了DENY、ANY、PUB、SUB常量,并初始化了ADMIN_CODE;checkPermission则根据neededPerm及ownedPerm来与DENY、ANY、PUB、SUB常量做&运算来得出是否有权限
小结
AccessValidator接口定义了parse、validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法;PlainAccessValidator实现了AccessValidator接口,其构造器创建了PlainPermissionManager;其parse方法解析remotingCommand及remoteAddr,构造plainAccessResource,具体根据不同的RequestCode来设置resourceAndPerm,之后根据request及fieldsMap设置content;其validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法都委托给了PlainPermissionManager
doc
- AccessValidator
以上是 聊聊rocketmq的AccessValidator 的全部内容, 来源链接: utcz.com/z/510603.html