聊聊OtterController

编程

本文主要研究一下OtterController

OtterController

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

public class OtterController implements NodeTaskListener, OtterControllerMBean {

private static final Logger logger = LoggerFactory.getLogger(OtterController.class);

// 第一层为pipelineId,第二层为S.E.T.L模块

private Map<Long, Map<StageType, GlobalTask>> controllers = OtterMigrateMap.makeComputingMap(new Function<Long, Map<StageType, GlobalTask>>() {

public Map<StageType, GlobalTask> apply(Long pipelineId) {

return new MapMaker().makeMap();

}

});

private ConfigClientService configClientService;

private ArbitrateManageService arbitrateManageService;

private NodeTaskService nodeTaskService;

// 各种资源管理

private DataSourceService dataSourceService; // 连接池资源

private DbDialectFactory dbDialectFactory; // 数据库信息资源

private ArbitrateEventService arbitrateEventService; // 仲裁器资源

private ExecutorService executorService;

private StageAggregationCollector stageAggregationCollector;

public void start() throws Throwable {

// 初始化节点

initNid();

nodeTaskService.addListener(this); // 将自己添加为NodeTask响应者

}

public void stop() throws Throwable {

for (Map<StageType, GlobalTask> tasks : controllers.values()) {

for (GlobalTask task : tasks.values()) {

try {

task.shutdown();

} catch (Exception e) {

logger.error("##shutdown task error!", e);

}

}

}

try {

Long nid = configClientService.currentNode().getId();

arbitrateManageService.nodeEvent().destory(Long.valueOf(nid));

} catch (Exception e) {

logger.error("##destory node error!", e);

}

try {

arbitrateEventService.toolEvent().release();

} catch (Exception e) {

logger.error("##destory arbitrate error!", e);

}

try {

nodeTaskService.stopNode(); // 通知manager停止当前node

} catch (Exception e) {

logger.error("##stop node error!", e);

}

try {

OtterContextLocator.close();

} catch (Exception e) {

logger.error("##cloes spring error!", e);

}

ZooKeeperClient.destory();// 关闭zookeeper

}

//......

}

  • OtterController实现了NodeTaskListener接口,提供了start、stop、process方法;其start方法主要是执行initNid及nodeTaskService.addListener(this);其stop方法则遍历controllers的GlobalTask,挨个执行其shutdown方法,然后执行arbitrateManageService.nodeEvent().destory()、arbitrateEventService.toolEvent().release()、nodeTaskService.stopNode()、OtterContextLocator.close()及ZooKeeperClient.destory();process方法主要是遍历nodeTasks,挨个执行stopPipeline或者startPipeline

initNid

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

    private void initNid() {

// 获取一下nid变量

String nid = System.getProperty(OtterConstants.NID_NAME);

if (StringUtils.isEmpty(nid)) {

throw new ConfigException("nid is not set!");

}

logger.info("INFO ## the nodeId = {}", nid);

checkNidVaild(nid);

arbitrateManageService.nodeEvent().init(Long.valueOf(nid));

// 添加session expired处理

NodeSessionExpired sessionExpired = new NodeSessionExpired();

sessionExpired.setNodeEvent(arbitrateManageService.nodeEvent());

ZooKeeperClient.registerNotification(sessionExpired);

}

  • initNid方法主要执行arbitrateManageService.nodeEvent().init(Long.valueOf(nid))及ZooKeeperClient.registerNotification(sessionExpired)

startPipeline

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

    public void startPipeline(NodeTask nodeTask) {

Long pipelineId = nodeTask.getPipeline().getId();

releasePipeline(pipelineId);

Map<StageType, GlobalTask> tasks = controllers.get(pipelineId);

// 处理具体的任务命令

List<StageType> stage = nodeTask.getStage();

List<TaskEvent> event = nodeTask.getEvent();

for (int i = 0; i < stage.size(); i++) {

StageType stageType = stage.get(i);

TaskEvent taskEvent = event.get(i);

if (taskEvent.isCreate()) {

startTask(nodeTask.getPipeline(), tasks, stageType);

} else {

stopTask(tasks, stageType);

}

}

}

private void startTask(Pipeline pipeline, Map<StageType, GlobalTask> tasks, StageType taskType) {

if (tasks.get(taskType) != null && tasks.get(taskType).isAlive()) {

logger.warn("WARN ## this task = {} has started", taskType);

}

GlobalTask task = null;

if (taskType.isSelect()) {

task = new SelectTask(pipeline.getId());

} else if (taskType.isExtract()) {

task = new ExtractTask(pipeline.getId());

} else if (taskType.isTransform()) {

task = new TransformTask(pipeline.getId());

} else if (taskType.isLoad()) {

task = new LoadTask(pipeline.getId());

}

if (task != null) {

OtterContextLocator.autowire(task); // 注入一下spring资源

task.start();

tasks.put(taskType, task);

logger.info("INFO ## start this task = {} success", taskType.toString());

}

}

private void stopTask(Map<StageType, GlobalTask> tasks, StageType taskType) {

GlobalTask task = tasks.remove(taskType);

if (task != null) {

task.shutdown();

logger.info("INFO ## taskName = {} has shutdown", taskType);

} else {

logger.info("INFo ## taskName = {} is not started", taskType);

}

}

  • startPipeline方法先执行releasePipeline,然后遍历指定pipeline的tasks,挨个执行startTask或者stopTask;startTask方法根据taskType类创建不同类型的GlobalTask,然后执行task.start();stopTask则从tasks移除指定taskType的GlobalTask,然后执行task的shutdown方法

stopPipeline

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

    private void stopPipeline(Long pipelineId, Map<StageType, GlobalTask> tasks) {

for (GlobalTask task : tasks.values()) {

try {

task.shutdown();

} catch (Exception e) {

logger.error("## stop s/e/t/l task error!", e);

} finally {

tasks.remove(task);

}

}

// close other resources.

try {

Thread.sleep(1 * 1000); // sleep 5s,等待S.E.T.L释放线程

} catch (InterruptedException e) {

logger.error("ERROR ## ", e);

}

// 释放资源

releasePipeline(pipelineId);

arbitrateEventService.toolEvent().release(pipelineId);

}

private void releasePipeline(Long pipelineId) {

dataSourceService.destroy(pipelineId);

dbDialectFactory.destory(pipelineId);

}

  • stopPipeline方法则遍历tasks挨个执行task.shutdown()及tasks.remove(task),最后执行releasePipeline及arbitrateEventService.toolEvent().release(pipelineId)

小结

OtterController实现了NodeTaskListener接口,提供了start、stop、process方法;其start方法主要是执行initNid及nodeTaskService.addListener(this);其stop方法则遍历controllers的GlobalTask,挨个执行其shutdown方法,然后执行arbitrateManageService.nodeEvent().destory()、arbitrateEventService.toolEvent().release()、nodeTaskService.stopNode()、OtterContextLocator.close()及ZooKeeperClient.destory();process方法主要是遍历nodeTasks,挨个执行stopPipeline或者startPipeline

doc

  • OtterController

以上是 聊聊OtterController 的全部内容, 来源链接: utcz.com/z/517315.html

回到顶部