系统设计(2)
之前的流程很简单:
通过入参路由到具体的flow(有多个flow,每个flow由不同的handler组成) |
|
|
执行flow中的execute方法(flow负责调度每个handler,其实就是循环调用每个handler的process方法)
|
|
|
执行完所有handler方法后,将结果以mq的形式调用上游系统;如果是同步返回的话,如同责任链模式,直接返回就行了;
但随着业务的发展,对handler有不同维度的新增
public enum HandlerType { /**
* 当handler节点是异步请求外部系统,需接收mq返回的结果,根据结果判断接入到流程的节点被认为是接入节点
*/
ACCESS("接入节点", 1),
/**
* 在每个flow初始化时定义的handler节点
*/
PROCESS("流程节点", 2);
private String desc;
private int code;
HandlerType(String desc, int code) {
this.desc = desc;
this.code = code;
}
}
public enum FeatureType { NORMAL("普通节点", 1), // 根据初始化的顺序已经指定上游节点和下游节点
OVER("结束节点", 2), // 指定节点是结束节点,如需异步请求外部系统节点,等待mq消息返回;或者最后一个节点
CONDITION("条件节点", 3); // 需根据该功能节点产生的数据动态判断下个节点,或者是不是结束节点
private String desc;
private int code;
FeatureType(String desc, int code) {
this.desc = desc;
this.code = code;
}
}
我们为了不影响现有的功能handler节点同时,对功能节点handler进行封装在DefaultHandlerContext内;通过委托方式进行调用handler方法;
同时新增handler的静态特性和动态特性;
- 静态特性:当前handler是属于(普通节点/结束节点/条件节点)、(接入节点/流程节点);
- 动态特性:当前节点是条件节点时,执行条件语句,获取到下个handler节点的beanName;或者执行条件语句后,当前节点变成结束节点;
之前设计的flow是通过while循环调度每个handler执行的顺序,这种设计比较单一,扩展短板比较明显;我们对每个DefaultHandlerContext新增了两个节点,分别是prev和next节点;
- 普通节点:在初始化时即可指定上游节点和下游节点
- 结束节点:下游节点为DefaultHandlerContext.NULL_OBJECT
- 条件节点:通过执行条件语句判断下一个节点beanName
通过以上的方式构造了有状态的DefaultHandlerContext,而且影响内部每个handler的单一功能指责;
DefaultHandlerContext:
public class DefaultHandlerContext<T> extends AbstractHandlerContext { public static final DefaultHandlerContext NULL_OBJECT = new DefaultHandlerContext();
private ProcessHandler<T> handler; // 功能handler
private String beanName; // beanName
protected FeatureType featureType = FeatureType.NORMAL; // 默认普通节点
protected HandlerType handlerType = HandlerType.PROCESS; // 默认流程内节点
protected DefaultHandlerContext prev; // 上一个节点
protected DefaultHandlerContext next; // 下一个节点
protected Function<T, String> conditionFun; // 条件节点判断方法
...
}
每个flow初始化时包含属于自身的handlerPipeLine,并将属于该flow中的DefaultHandlerContext按链路顺序压入;
// 默认初始化pipeLine,每个flow都有相同的start、terminate节点 HandlerContextPipeLine pipeLine = new HandlerContextPipeLine(new DefaultHandlerContext(start), new DefaultHandlerContext(terminate, FeatureType.OVER));
// 条件节点,通过handler结果获取下个节点的beanName
private Function<Boolean, String> inTransitFun = (existOrder) -> existOrder ?
"preMqSend" : "fraudQuery";
pipeLine
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("inTransitOrder"), inTransitFun)) // 条件节点, 条件语句inTransitFun
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudQuery"), FeatureType.OVER)) // 结束节点 —— 调用外部系统接口
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudReceiver"), fraudReceiverFun)) // 条件节点
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("prePaymentQuery"), prePaymentFun))
.add(new DefaultHandlerContext(HandlerUtil.getHandler("quotaMatch"))) // 普通节点
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("discriOver"), discrFun))
.add(new DefaultHandlerContext(HandlerUtil.getHandler("guaranteeQuery"), FeatureType.OVER))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("guaranteeReceive"), guaFun))
.add(new DefaultHandlerContext(HandlerUtil.getHandler("preMqSend")));
HandlerPipeLine:
import java.util.HashMap;import java.util.Map;
import java.util.function.Function;
public class HandlerContextPipeLine {
// 当前flow流程包含的所有DefaultHandlerContext
private Map<String, DefaultHandlerContext> contextMap = new HashMap<String, DefaultHandlerContext>() {{
// 包含NULL_OBJECT
put(null, DefaultHandlerContext.NULL_OBJECT);
}};
// 根据beanName查找DefaultHandlerContext
private Function<String, DefaultHandlerContext> handlerFun =
(beanName) -> contextMap.getOrDefault(beanName, DefaultHandlerContext.NULL_OBJECT);
// 流程中的开始节点
private DefaultHandlerContext head;
// 流程中的结束节点
private DefaultHandlerContext end;
/**
* HandlerContextPipeLine 构造函数
* @param head
* @param end
*/
public HandlerContextPipeLine(DefaultHandlerContext head, DefaultHandlerContext end) {
this.head = head;
this.end = end;
contextMap.put(head.getBeanName(), head);
contextMap.put(end.getBeanName(), end);
head.setNext(end);
end.setPrev(head);
}
/**
* 新增节点时: 当前节点是普通节点时,设置当前节点的next节点为end
* ----------上游节点是普通节点时,设置当前节点为上游节点的next节点
*
* @param handlerContext
* @return
*/
public HandlerContextPipeLine add(DefaultHandlerContext handlerContext) {
DefaultHandlerContext preContext = end.getPrev();
end.setPrev(handlerContext);
if (handlerContext.getFeatureType() == FeatureType.NORMAL) {
handlerContext.setNext(end);
}
handlerContext.setPrev(preContext);
if (preContext.getFeatureType() == FeatureType.NORMAL) {
preContext.setNext(handlerContext);
}
contextMap.put(handlerContext.getBeanName(), handlerContext);
return this;
}
/**
* 流程的开始节点
*
* @param reqParam
* @param dataUtil
*/
public void start(ReqParam reqParam, DataUtil dataUtil) {
head.execute(reqParam, dataUtil, handlerFun);
}
/**
* 根据beanName查询获得DefaultHandlerContext
*
* @param beanName
* @param reqParam
* @param dataUtil
*/
public void access(String beanName, ReqParam reqParam, DataUtil dataUtil) {
DefaultHandlerContext accessHandler = handlerFun.apply(beanName);
accessHandler.execute(reqParam, dataUtil, handlerFun);
}
/**
* mq接收数据接入流程
*
* @param beanName access name
* @param func 要执行的func
* @param reqParam
* @param dataUtil
*/
public void access(String beanName, Function<ReqParam, String> func, ReqParam reqParam, DataUtil dataUtil) {
DefaultHandlerContext<ReqParam> handlerContext = new DefaultHandlerContext<>(func, beanName);
handlerContext.execute(reqParam, dataUtil, handlerFun);
}
}
AbstractHandlerContext类:链路调用,优化之前的while循环调用
@Data@Slf4j
public abstract class AbstractHandlerContext<T> implements ProcessHandler<T> {
...
/**
* 执行DefaultHandlerContext
* 1.当前DefaultHandlerContext是条件节点时,执行功能handler节点;以返回结果作为入参执行条件语句获得next handler的beanName;通过beanName获得next DefaultHandlerContext
* 2.当前不是条件节点时,直接执行功能节点的方法即可
* 3.如果下一个节点是NULL_OBJECT时,则当前节点为结束节点,返回即可
* 4.如果next节点不是结束节点,则执行next节点的execute方法
* @param reqParam
* @param dataUtil
* @param handlerFun
*/
void execute(ReqParam reqParam, DataUtil dataUtil, Function<String, DefaultHandlerContext> handlerFun) {
if (FeatureType.CONDITION == featureType) {
next = handlerFun.apply(conditionFun.apply(this.process(reqParam)));
} else {
this.process(reqParam);
}
...
...
// 判断该handler处理完是否终结
if (next == NULL_OBJECT) {
return;
}
this.next.execute(reqParam, dataUtil, handlerFun);
}
...
}
以上解释了现有系统架构的链路,首先初始化每个flow的所有DefaultContextHandler;并为每个功能DefaultContextHandler增添了静态/动态特性,改变了之前的调度模式,提高扩展能力;
接入节点:
接入节点换一种表达方式,可以称为虚拟节点,它承接着mq异步消息的接收,并对结果判断接入到流程中的具体节点,同时根据不同的flow,处理的逻辑也不尽相同;由此知道,接入的逻辑判断和具体的flow也有关联;每个flow对接入有自身的判断逻辑;但可把默认的接入写在基类中,个性化的,子类自己复写就好;
Map<String, Map<String, Function>> accessMap = new HashMap<String, Map<String, Function>>() {{ // 反欺诈
put("fraudAccess",
new HashMap<String, Function>() {{
put("pass", (obj) -> "fraudReceiver");
}});
// 担保策略
put("guaPloyAccess",
new HashMap<String, Function>() {{
put("pass", (obj) -> "guaranteeReceive");
}});
// 资方策略
put("tenantAccess",
new HashMap<String, Function>() {{
put("pass", (obj) -> "guaranteeQuery");
}});
// 额度匹配(决策、电审、提额、担保)
put("matchAccess",
new HashMap<String, Function>() {{
put("pass", (obj) -> "quotaMatch");
put("reject", (obj) -> "allSceneFinished");
put("cancel", (obj) -> "allSceneFinished");
}});
}};
/**
* 从mq接入流程内
*
* @param access
* @param reqParam
* @param type
*/
@Override
public void access(FlowAccess access, String type, ReqParam reqParam) {
String funcKey = access.getBeanName();
Function func = null;
if (accessMap.containsKey(funcKey)) {
func = accessMap.get(funcKey).get(type);
} else if (access instanceof MatchQuotaAccess) {
func = accessMap.get("matchAccess").get(type);
}
Assert.notNull(func, access.getBeanName() + "func should not null");
pipeLine.access(funcKey, func, reqParam, getDataUtil());
}
接入的类调用自身所属于的flow流程内的access方法,根据type类型pass/reject/cancel获取不同的条件语句;
调用pipeline中的access方法;组装接入节点DefaultHandlerContext;调用DefaultHandlerContext的execute方法;
因为接入节点内的handler是null,所以只需要执行接入节点中的条件语句获取到next节点的beanName,并通过beanName获取到该flow中的DefaultHandlerContext;由此接入流程内;
以上是系统重构后的状态,基本能满足当前业务,并支持一定程度上的扩展;
-------------------------- 问题点 ---------------------------
在重构的过程中遇到的问题,因为每个功能handler bean内都有个属性beanName;是通过实现BeanPostProcess方式进行创建代理注入的;
@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().isAnnotationPresent(Handler.class)) {
Object proxy = new Proxy(bean, beanName).proxy();
HandlerUtil.processHandlerMap.put(beanName, (ProcessHandler) proxy);
return proxy;
}
if (bean.getClass().isAnnotationPresent(Access.class)) {
Object proxy = new Proxy(bean, beanName).proxy();
HandlerUtil.accessMap.put(beanName, (FlowAccess) proxy);
return proxy;
}
return bean;
}
但在baseFlow实现InitializingBean接口中初始化HandlerPipeLine时,需通过beanName获取相应的handler代理类进行组装DefaultHandlerContext;所以在此之前需完成所有实现ProcessHandler接口的功能类实例化,并包含相应的(beanName, handlerProxy)集合;
之前通过在BaseFlow中@Resource注入了head和terminated功能节点实例化HandlerPipeLine;所以会优先初始化head和terminated,创建代理类;但并不能保证全量ProcessHandler接口实现类都已经初始化;这样会导致在组装DefaultHandlerContext时,handler是null;整个链条是无效的;
现在只是偷巧的解决;通过@Autowired方式去注入head和terminated;因为用的是类型注入,所以上下文会保证执行baseflow中的afterPropertiesSet方法时,将所有的processHandler接口实现类都初始化,以此解决上述问题;但并没有从spring加载的链路上去做根本解决;
2.在DefaultHandlerContext定义的泛型是和所包含的handler一致的;导致在扩展的时候出现一定的局限性,后续会想比较好的方式消除;
以上是 系统设计(2) 的全部内容, 来源链接: utcz.com/z/512897.html