聊聊carrera的GroovyScriptAction

编程

本文主要研究一下carrera的GroovyScriptAction

Action

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/Action.java

public interface Action {

enum Status {

FAIL, CONTINUE, FINISH, ASYNCHRONIZED

}

class UnsupportedDataType extends RuntimeException {

}

default Status act(UpstreamJob job) {

Object data = job.getData();

if (data instanceof byte[]) {

return act(job, (byte[]) data);

} else if (data instanceof JSONObject) {

return act(job, (JSONObject) data);

} else {

throw new UnsupportedDataType();

}

}

default Status act(UpstreamJob job, byte[] bytes) {

throw new UnsupportedDataType();

}

default Status act(UpstreamJob job, JSONObject jsonObject) {

throw new UnsupportedDataType();

}

default void shutdown() {

// DO NOTHING BY DEFAULT

}

default void logMetrics() {

// DO NOTHING BY DEFAULT

}

}

  • Action接口定义了Status枚举,也定义了act、shutdown、logMetrics方法

GroovyScriptAction

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/GroovyScriptAction.java

public class GroovyScriptAction implements Action {

private final static String CARRERA_GROOVY_CONTEXT = "carreraContext";

@SuppressWarnings("rawtypes")

private final static LoadingCache<String, Class> cache = CacheBuilder

.newBuilder()

.expireAfterAccess(1, TimeUnit.HOURS)

.build(new CacheLoader<String, Class>() {

private final AtomicLong al = new AtomicLong(0);

@Override

public Class load(String key) throws Exception {

try (GroovyClassLoader groovyLoader = new GroovyClassLoader()) {

GroovyCodeSource gcs = AccessController.doPrivileged((PrivilegedAction<GroovyCodeSource>) () -> new GroovyCodeSource(key, "Script" + al.getAndIncrement() + ".groovy", "/groovy/shell"));

Class clazz = groovyLoader.parseClass(gcs, false);

return clazz;

} catch (Throwable e) {

LogUtils.logErrorInfo("GroovyScript_error", "[GroovyErr]", e);

return null;

}

}

});

@Override

public Status act(UpstreamJob job, JSONObject jsonObject) {

String groovyText = job.getUpstreamTopic().getGroovyScript();

if (StringUtils.isBlank(groovyText)) {

return Status.FINISH;

}

try {

@SuppressWarnings("rawtypes")

Class groovyScript = cache.get(groovyText);

if (groovyScript == null) {

MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);

return Status.FINISH;

}

jsonObject.put(CARRERA_GROOVY_CONTEXT, new GroovyContext(job));

Script script = InvokerHelper.createScript(groovyScript, new Binding(jsonObject));

Object scriptRet = script.run();

if (scriptRet instanceof Boolean) {

if ((Boolean) scriptRet) {

jsonObject.remove(CARRERA_GROOVY_CONTEXT);

return Status.CONTINUE;

}

}

} catch (MissingPropertyException e) {

LogUtils.logErrorInfo("GroovyScript_error", "missing property exception, jsonObject:{}, job:{}, e.msg:{}",

JsonUtils.toJsonString(jsonObject), job.info(), e.getMessageWithoutLocationText());

} catch (Throwable e) {

LogUtils.logErrorInfo("GroovyScript_error", "error when running groovy script, job={}, e={}", job, e.getMessage());

}

MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);

return Status.FINISH;

}

}

  • GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

小结

GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

doc

  • GroovyScriptAction

以上是 聊聊carrera的GroovyScriptAction 的全部内容, 来源链接: utcz.com/z/512624.html

回到顶部