系统设计(2)

编程

之前的流程很简单:

通过入参路由到具体的flow(有多个flow,每个flow由不同的handler组成)

|

|

|

执行flow中的execute方法(flow负责调度每个handler,其实就是循环调用每个handler的process方法)

|

|

|

执行完所有handler方法后,将结果以mq的形式调用上游系统;如果是同步返回的话,如同责任链模式,直接返回就行了;

但随着业务的发展,对handler有不同维度的新增

public enum HandlerType {

/**

* 当handler节点是异步请求外部系统,需接收mq返回的结果,根据结果判断接入到流程的节点被认为是接入节点

*/

ACCESS("接入节点", 1),

/**

* 在每个flow初始化时定义的handler节点

*/

PROCESS("流程节点", 2);

private String desc;

private int code;

HandlerType(String desc, int code) {

this.desc = desc;

this.code = code;

}

}

public enum FeatureType {

NORMAL("普通节点", 1), // 根据初始化的顺序已经指定上游节点和下游节点

OVER("结束节点", 2), // 指定节点是结束节点,如需异步请求外部系统节点,等待mq消息返回;或者最后一个节点

CONDITION("条件节点", 3); // 需根据该功能节点产生的数据动态判断下个节点,或者是不是结束节点

private String desc;

private int code;

FeatureType(String desc, int code) {

this.desc = desc;

this.code = code;

}

}

我们为了不影响现有的功能handler节点同时,对功能节点handler进行封装在DefaultHandlerContext内;通过委托方式进行调用handler方法;

同时新增handler的静态特性和动态特性;

  • 静态特性:当前handler是属于(普通节点/结束节点/条件节点)、(接入节点/流程节点);
  • 动态特性:当前节点是条件节点时,执行条件语句,获取到下个handler节点的beanName;或者执行条件语句后,当前节点变成结束节点;

之前设计的flow是通过while循环调度每个handler执行的顺序,这种设计比较单一,扩展短板比较明显;我们对每个DefaultHandlerContext新增了两个节点,分别是prev和next节点;

  • 普通节点:在初始化时即可指定上游节点和下游节点
  • 结束节点:下游节点为DefaultHandlerContext.NULL_OBJECT
  • 条件节点:通过执行条件语句判断下一个节点beanName

通过以上的方式构造了有状态的DefaultHandlerContext,而且影响内部每个handler的单一功能指责;

DefaultHandlerContext:

public class DefaultHandlerContext<T> extends AbstractHandlerContext {

public static final DefaultHandlerContext NULL_OBJECT = new DefaultHandlerContext();

private ProcessHandler<T> handler; // 功能handler

private String beanName; // beanName

protected FeatureType featureType = FeatureType.NORMAL; // 默认普通节点

protected HandlerType handlerType = HandlerType.PROCESS; // 默认流程内节点

protected DefaultHandlerContext prev; // 上一个节点

protected DefaultHandlerContext next; // 下一个节点

protected Function<T, String> conditionFun; // 条件节点判断方法

...

}

每个flow初始化时包含属于自身的handlerPipeLine,并将属于该flow中的DefaultHandlerContext按链路顺序压入;

// 默认初始化pipeLine,每个flow都有相同的start、terminate节点       

HandlerContextPipeLine pipeLine = new HandlerContextPipeLine(new DefaultHandlerContext(start), new DefaultHandlerContext(terminate, FeatureType.OVER));

// 条件节点,通过handler结果获取下个节点的beanName

private Function<Boolean, String> inTransitFun = (existOrder) -> existOrder ?

"preMqSend" : "fraudQuery";

pipeLine

.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("inTransitOrder"), inTransitFun)) // 条件节点, 条件语句inTransitFun

.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudQuery"), FeatureType.OVER)) // 结束节点 —— 调用外部系统接口

.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudReceiver"), fraudReceiverFun)) // 条件节点

.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("prePaymentQuery"), prePaymentFun))

.add(new DefaultHandlerContext(HandlerUtil.getHandler("quotaMatch"))) // 普通节点

.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("discriOver"), discrFun))

.add(new DefaultHandlerContext(HandlerUtil.getHandler("guaranteeQuery"), FeatureType.OVER))

.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("guaranteeReceive"), guaFun))

.add(new DefaultHandlerContext(HandlerUtil.getHandler("preMqSend")));

HandlerPipeLine:

import java.util.HashMap;

import java.util.Map;

import java.util.function.Function;

public class HandlerContextPipeLine {

// 当前flow流程包含的所有DefaultHandlerContext

private Map<String, DefaultHandlerContext> contextMap = new HashMap<String, DefaultHandlerContext>() {{

// 包含NULL_OBJECT

put(null, DefaultHandlerContext.NULL_OBJECT);

}};

// 根据beanName查找DefaultHandlerContext

private Function<String, DefaultHandlerContext> handlerFun =

(beanName) -> contextMap.getOrDefault(beanName, DefaultHandlerContext.NULL_OBJECT);

// 流程中的开始节点

private DefaultHandlerContext head;

// 流程中的结束节点

private DefaultHandlerContext end;

/**

* HandlerContextPipeLine 构造函数

* @param head

* @param end

*/

public HandlerContextPipeLine(DefaultHandlerContext head, DefaultHandlerContext end) {

this.head = head;

this.end = end;

contextMap.put(head.getBeanName(), head);

contextMap.put(end.getBeanName(), end);

head.setNext(end);

end.setPrev(head);

}

/**

* 新增节点时: 当前节点是普通节点时,设置当前节点的next节点为end

* ----------上游节点是普通节点时,设置当前节点为上游节点的next节点

*

* @param handlerContext

* @return

*/

public HandlerContextPipeLine add(DefaultHandlerContext handlerContext) {

DefaultHandlerContext preContext = end.getPrev();

end.setPrev(handlerContext);

if (handlerContext.getFeatureType() == FeatureType.NORMAL) {

handlerContext.setNext(end);

}

handlerContext.setPrev(preContext);

if (preContext.getFeatureType() == FeatureType.NORMAL) {

preContext.setNext(handlerContext);

}

contextMap.put(handlerContext.getBeanName(), handlerContext);

return this;

}

/**

* 流程的开始节点

*

* @param reqParam

* @param dataUtil

*/

public void start(ReqParam reqParam, DataUtil dataUtil) {

head.execute(reqParam, dataUtil, handlerFun);

}

/**

* 根据beanName查询获得DefaultHandlerContext

*

* @param beanName

* @param reqParam

* @param dataUtil

*/

public void access(String beanName, ReqParam reqParam, DataUtil dataUtil) {

DefaultHandlerContext accessHandler = handlerFun.apply(beanName);

accessHandler.execute(reqParam, dataUtil, handlerFun);

}

/**

* mq接收数据接入流程

*

* @param beanName access name

* @param func 要执行的func

* @param reqParam

* @param dataUtil

*/

public void access(String beanName, Function<ReqParam, String> func, ReqParam reqParam, DataUtil dataUtil) {

DefaultHandlerContext<ReqParam> handlerContext = new DefaultHandlerContext<>(func, beanName);

handlerContext.execute(reqParam, dataUtil, handlerFun);

}

}

AbstractHandlerContext类:链路调用,优化之前的while循环调用

@Data

@Slf4j

public abstract class AbstractHandlerContext<T> implements ProcessHandler<T> {

...

/**

* 执行DefaultHandlerContext

* 1.当前DefaultHandlerContext是条件节点时,执行功能handler节点;以返回结果作为入参执行条件语句获得next handler的beanName;通过beanName获得next DefaultHandlerContext

* 2.当前不是条件节点时,直接执行功能节点的方法即可

* 3.如果下一个节点是NULL_OBJECT时,则当前节点为结束节点,返回即可

* 4.如果next节点不是结束节点,则执行next节点的execute方法

* @param reqParam

* @param dataUtil

* @param handlerFun

*/

void execute(ReqParam reqParam, DataUtil dataUtil, Function<String, DefaultHandlerContext> handlerFun) {

if (FeatureType.CONDITION == featureType) {

next = handlerFun.apply(conditionFun.apply(this.process(reqParam)));

} else {

this.process(reqParam);

}

...

...

// 判断该handler处理完是否终结

if (next == NULL_OBJECT) {

return;

}

this.next.execute(reqParam, dataUtil, handlerFun);

}

...

}

以上解释了现有系统架构的链路,首先初始化每个flow的所有DefaultContextHandler;并为每个功能DefaultContextHandler增添了静态/动态特性,改变了之前的调度模式,提高扩展能力;

接入节点:

接入节点换一种表达方式,可以称为虚拟节点,它承接着mq异步消息的接收,并对结果判断接入到流程中的具体节点,同时根据不同的flow,处理的逻辑也不尽相同;由此知道,接入的逻辑判断和具体的flow也有关联;每个flow对接入有自身的判断逻辑;但可把默认的接入写在基类中,个性化的,子类自己复写就好;

    Map<String, Map<String, Function>> accessMap = new HashMap<String, Map<String, Function>>() {{

// 反欺诈

put("fraudAccess",

new HashMap<String, Function>() {{

put("pass", (obj) -> "fraudReceiver");

}});

// 担保策略

put("guaPloyAccess",

new HashMap<String, Function>() {{

put("pass", (obj) -> "guaranteeReceive");

}});

// 资方策略

put("tenantAccess",

new HashMap<String, Function>() {{

put("pass", (obj) -> "guaranteeQuery");

}});

// 额度匹配(决策、电审、提额、担保)

put("matchAccess",

new HashMap<String, Function>() {{

put("pass", (obj) -> "quotaMatch");

put("reject", (obj) -> "allSceneFinished");

put("cancel", (obj) -> "allSceneFinished");

}});

}};

/**

* 从mq接入流程内

*

* @param access

* @param reqParam

* @param type

*/

@Override

public void access(FlowAccess access, String type, ReqParam reqParam) {

String funcKey = access.getBeanName();

Function func = null;

if (accessMap.containsKey(funcKey)) {

func = accessMap.get(funcKey).get(type);

} else if (access instanceof MatchQuotaAccess) {

func = accessMap.get("matchAccess").get(type);

}

Assert.notNull(func, access.getBeanName() + "func should not null");

pipeLine.access(funcKey, func, reqParam, getDataUtil());

}

接入的类调用自身所属于的flow流程内的access方法,根据type类型pass/reject/cancel获取不同的条件语句;

调用pipeline中的access方法;组装接入节点DefaultHandlerContext;调用DefaultHandlerContext的execute方法;

因为接入节点内的handler是null,所以只需要执行接入节点中的条件语句获取到next节点的beanName,并通过beanName获取到该flow中的DefaultHandlerContext;由此接入流程内;

以上是系统重构后的状态,基本能满足当前业务,并支持一定程度上的扩展;

-------------------------- 问题点 ---------------------------

在重构的过程中遇到的问题,因为每个功能handler bean内都有个属性beanName;是通过实现BeanPostProcess方式进行创建代理注入的;

    @Override

public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

if (bean.getClass().isAnnotationPresent(Handler.class)) {

Object proxy = new Proxy(bean, beanName).proxy();

HandlerUtil.processHandlerMap.put(beanName, (ProcessHandler) proxy);

return proxy;

}

if (bean.getClass().isAnnotationPresent(Access.class)) {

Object proxy = new Proxy(bean, beanName).proxy();

HandlerUtil.accessMap.put(beanName, (FlowAccess) proxy);

return proxy;

}

return bean;

}

但在baseFlow实现InitializingBean接口中初始化HandlerPipeLine时,需通过beanName获取相应的handler代理类进行组装DefaultHandlerContext;所以在此之前需完成所有实现ProcessHandler接口的功能类实例化,并包含相应的(beanName, handlerProxy)集合;

之前通过在BaseFlow中@Resource注入了head和terminated功能节点实例化HandlerPipeLine;所以会优先初始化head和terminated,创建代理类;但并不能保证全量ProcessHandler接口实现类都已经初始化;这样会导致在组装DefaultHandlerContext时,handler是null;整个链条是无效的;

现在只是偷巧的解决;通过@Autowired方式去注入head和terminated;因为用的是类型注入,所以上下文会保证执行baseflow中的afterPropertiesSet方法时,将所有的processHandler接口实现类都初始化,以此解决上述问题;但并没有从spring加载的链路上去做根本解决;

2.在DefaultHandlerContext定义的泛型是和所包含的handler一致的;导致在扩展的时候出现一定的局限性,后续会想比较好的方式消除;

以上是 系统设计(2) 的全部内容, 来源链接: utcz.com/z/512897.html

回到顶部