聊聊rocketmq的AccessValidator

编程

本文主要研究一下rocketmq的AccessValidator

AccessValidator

rocketmq/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java

public interface AccessValidator {

/**

* Parse to get the AccessResource(user, resource, needed permission)

*

* @param request

* @param remoteAddr

* @return Plain access resource result,include access key,signature and some other access attributes.

*/

AccessResource parse(RemotingCommand request, String remoteAddr);

/**

* Validate the access resource.

*

* @param accessResource

*/

void validate(AccessResource accessResource);

/**

* Update the access resource config

*

* @param plainAccessConfig

* @return

*/

boolean updateAccessConfig(PlainAccessConfig plainAccessConfig);

/**

* Delete the access resource config

*

* @return

*/

boolean deleteAccessConfig(String accesskey);

/**

* Get the access resource config version information

*

* @return

*/

String getAclConfigVersion();

/**

* Update globalWhiteRemoteAddresses in acl yaml config file

* @return

*/

boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);

}

  • AccessValidator接口定义了parse、validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法

PlainAccessValidator

rocketmq/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java

public class PlainAccessValidator implements AccessValidator {

private PlainPermissionManager aclPlugEngine;

public PlainAccessValidator() {

aclPlugEngine = new PlainPermissionManager();

}

@Override

public AccessResource parse(RemotingCommand request, String remoteAddr) {

PlainAccessResource accessResource = new PlainAccessResource();

if (remoteAddr != null && remoteAddr.contains(":")) {

accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);

} else {

accessResource.setWhiteRemoteAddress(remoteAddr);

}

accessResource.setRequestCode(request.getCode());

if (request.getExtFields() == null) {

// If request"s extFields is null,then return accessResource directly(users can use whiteAddress pattern)

// The following logic codes depend on the request"s extFields not to be null.

return accessResource;

}

accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));

accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));

accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));

try {

switch (request.getCode()) {

case RequestCode.SEND_MESSAGE:

accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);

break;

case RequestCode.SEND_MESSAGE_V2:

accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);

break;

case RequestCode.CONSUMER_SEND_MSG_BACK:

accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);

accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);

break;

case RequestCode.PULL_MESSAGE:

accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);

accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);

break;

case RequestCode.QUERY_MESSAGE:

accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);

break;

case RequestCode.HEART_BEAT:

HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);

for (ConsumerData data : heartbeatData.getConsumerDataSet()) {

accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);

for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {

accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);

}

}

break;

case RequestCode.UNREGISTER_CLIENT:

final UnregisterClientRequestHeader unregisterClientRequestHeader =

(UnregisterClientRequestHeader) request

.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);

accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);

break;

case RequestCode.GET_CONSUMER_LIST_BY_GROUP:

final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =

(GetConsumerListByGroupRequestHeader) request

.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);

accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);

break;

case RequestCode.UPDATE_CONSUMER_OFFSET:

final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =

(UpdateConsumerOffsetRequestHeader) request

.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);

accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);

accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);

break;

default:

break;

}

} catch (Throwable t) {

throw new AclException(t.getMessage(), t);

}

// Content

SortedMap<String, String> map = new TreeMap<String, String>();

for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {

if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {

map.put(entry.getKey(), entry.getValue());

}

}

accessResource.setContent(AclUtils.combineRequestContent(request, map));

return accessResource;

}

@Override

public void validate(AccessResource accessResource) {

aclPlugEngine.validate((PlainAccessResource) accessResource);

}

@Override

public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {

return aclPlugEngine.updateAccessConfig(plainAccessConfig);

}

@Override

public boolean deleteAccessConfig(String accesskey) {

return aclPlugEngine.deleteAccessConfig(accesskey);

}

@Override public String getAclConfigVersion() {

return aclPlugEngine.getAclConfigDataVersion();

}

@Override public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {

return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);

}

}

  • PlainAccessValidator实现了AccessValidator接口,其构造器创建了PlainPermissionManager;其parse方法解析remotingCommand及remoteAddr,构造plainAccessResource,具体根据不同的RequestCode来设置resourceAndPerm,之后根据request及fieldsMap设置content;其validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法都委托给了PlainPermissionManager

PlainPermissionManager

rocketmq/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java

public class PlainPermissionManager {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";

private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,

System.getenv(MixAll.ROCKETMQ_HOME_ENV));

private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);

private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();

private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();

private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();

private boolean isWatchStart;

private final DataVersion dataVersion = new DataVersion();

public PlainPermissionManager() {

load();

watch();

}

//......

public void validate(PlainAccessResource plainAccessResource) {

// Check the global white remote addr

for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {

if (remoteAddressStrategy.match(plainAccessResource)) {

return;

}

}

if (plainAccessResource.getAccessKey() == null) {

throw new AclException(String.format("No accessKey is configured"));

}

if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {

throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));

}

// Check the white addr for accesskey

PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());

if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {

return;

}

// Check the signature

String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());

if (!signature.equals(plainAccessResource.getSignature())) {

throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));

}

// Check perm of each resource

checkPerm(plainAccessResource, ownedAccess);

}

void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {

if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {

throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));

}

Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();

Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();

if (needCheckedPermMap == null) {

// If the needCheckedPermMap is null,then return

return;

}

if (ownedPermMap == null && ownedAccess.isAdmin()) {

// If the ownedPermMap is null and it is an admin user, then return

return;

}

for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {

String resource = needCheckedEntry.getKey();

Byte neededPerm = needCheckedEntry.getValue();

boolean isGroup = PlainAccessResource.isRetryTopic(resource);

if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {

// Check the default perm

byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :

ownedAccess.getDefaultTopicPerm();

if (!Permission.checkPermission(neededPerm, ownedPerm)) {

throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));

}

continue;

}

if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {

throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));

}

}

}

//......

}

  • validate方法首先根据globalWhiteRemoteAddressStrategy进行校验,通过则立马返回;之后校验accessKey是否为null,为null的抛出AclException,接着判断该accessKey上是否在plainAccessResourceMap里头,不包含则抛出AclException;接着取出该accessKey对应的PlainAccessResource根据remoteAddressStrategy判断是否在白名单内,在则立马返回
  • 之后时候配置的secretKey对plainAccessResource.getContent()进行签名,然后对比plainAccessResource.getSignature(),不一样则抛出AclException;最后是通过checkPerm来校验权限
  • checkPerm的入参为needCheckedAccess及ownedAccess,它首先判断是否需要是adminPerm,如果是且ownedAccess.isAdmin()为false则抛出AclException;之后是遍历needCheckedPermMap对每个needCheckedEntry从ownedPermMap取出相应数据判断,这里调用了Permission.checkPermission方法

Permission

rocketmq/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java

public class Permission {

public static final byte DENY = 1;

public static final byte ANY = 1 << 1;

public static final byte PUB = 1 << 2;

public static final byte SUB = 1 << 3;

public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>();

static {

// UPDATE_AND_CREATE_TOPIC

ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC);

// UPDATE_BROKER_CONFIG

ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG);

// DELETE_TOPIC_IN_BROKER

ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER);

// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP

ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP);

// DELETE_SUBSCRIPTIONGROUP

ADMIN_CODE.add(RequestCode.DELETE_SUBSCRIPTIONGROUP);

}

public static boolean checkPermission(byte neededPerm, byte ownedPerm) {

if ((ownedPerm & DENY) > 0) {

return false;

}

if ((neededPerm & ANY) > 0) {

return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);

}

return (neededPerm & ownedPerm) > 0;

}

public static byte parsePermFromString(String permString) {

if (permString == null) {

return Permission.DENY;

}

switch (permString.trim()) {

case "PUB":

return Permission.PUB;

case "SUB":

return Permission.SUB;

case "PUB|SUB":

return Permission.PUB | Permission.SUB;

case "SUB|PUB":

return Permission.PUB | Permission.SUB;

case "DENY":

return Permission.DENY;

default:

return Permission.DENY;

}

}

public static void parseResourcePerms(PlainAccessResource plainAccessResource, Boolean isTopic,

List<String> resources) {

if (resources == null || resources.isEmpty()) {

return;

}

for (String resource : resources) {

String[] items = StringUtils.split(resource, "=");

if (items.length == 2) {

plainAccessResource.addResourceAndPerm(isTopic ? items[0].trim() : PlainAccessResource.getRetryTopic(items[0].trim()), parsePermFromString(items[1].trim()));

} else {

throw new AclException(String.format("Parse resource permission failed for %s:%s", isTopic ? "topic" : "group", resource));

}

}

}

public static boolean needAdminPerm(Integer code) {

return ADMIN_CODE.contains(code);

}

}

  • Permission定义了DENY、ANY、PUB、SUB常量,并初始化了ADMIN_CODE;checkPermission则根据neededPerm及ownedPerm来与DENY、ANY、PUB、SUB常量做&运算来得出是否有权限

小结

AccessValidator接口定义了parse、validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法;PlainAccessValidator实现了AccessValidator接口,其构造器创建了PlainPermissionManager;其parse方法解析remotingCommand及remoteAddr,构造plainAccessResource,具体根据不同的RequestCode来设置resourceAndPerm,之后根据request及fieldsMap设置content;其validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法都委托给了PlainPermissionManager

doc

  • AccessValidator

以上是 聊聊rocketmq的AccessValidator 的全部内容, 来源链接: utcz.com/z/510603.html

回到顶部