聊聊canal的ApplicationConfigMonitor

编程

ApplicationConfigMonitor

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java

@Component

public class ApplicationConfigMonitor {

private static final Logger logger = LoggerFactory.getLogger(ApplicationConfigMonitor.class);

@Resource

private ContextRefresher contextRefresher;

@Resource

private CanalAdapterService canalAdapterService;

private FileAlterationMonitor fileMonitor;

@PostConstruct

public void init() {

File confDir = Util.getConfDirPath();

try {

FileAlterationObserver observer = new FileAlterationObserver(confDir,

FileFilterUtils.and(FileFilterUtils.fileFileFilter(),

FileFilterUtils.prefixFileFilter("application"),

FileFilterUtils.suffixFileFilter("yml")));

FileListener listener = new FileListener();

observer.addListener(listener);

fileMonitor = new FileAlterationMonitor(3000, observer);

fileMonitor.start();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

@PreDestroy

public void destroy() {

try {

fileMonitor.stop();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

private class FileListener extends FileAlterationListenerAdaptor {

@Override

public void onFileChange(File file) {

super.onFileChange(file);

try {

// 检查yml格式

new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class);

canalAdapterService.destroy();

// refresh context

contextRefresher.refresh();

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

// ignore

}

canalAdapterService.init();

logger.info("## adapter application config reloaded.");

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

}

  • ApplicationConfigMonitor在PostConstruct的时候创建FileAlterationObserver,添加FileListener,然后使用FileAlterationObserver创建FileAlterationMonitor,然后执行其start方法;在PreDestroy的时候执行fileMonitor.stop();FileListener继承了FileAlterationListenerAdaptor,其onFileChange方法会检查yml格式,执行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

FileAlterationMonitor

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationMonitor.java

public final class FileAlterationMonitor implements Runnable {

private final long interval;

private final List<FileAlterationObserver> observers = new CopyOnWriteArrayList<FileAlterationObserver>();

private Thread thread = null;

private ThreadFactory threadFactory;

private volatile boolean running = false;

public FileAlterationMonitor() {

this(10000);

}

public FileAlterationMonitor(long interval) {

this.interval = interval;

}

public FileAlterationMonitor(long interval, FileAlterationObserver... observers) {

this(interval);

if (observers != null) {

for (FileAlterationObserver observer : observers) {

addObserver(observer);

}

}

}

//......

public synchronized void start() throws Exception {

if (running) {

throw new IllegalStateException("Monitor is already running");

}

for (FileAlterationObserver observer : observers) {

observer.initialize();

}

running = true;

if (threadFactory != null) {

thread = threadFactory.newThread(this);

} else {

thread = new Thread(this);

}

thread.start();

}

/**

* Stop monitoring.

*

* @throws Exception if an error occurs initializing the observer

*/

public synchronized void stop() throws Exception {

stop(interval);

}

/**

* Stop monitoring.

*

* @param stopInterval the amount of time in milliseconds to wait for the thread to finish.

* A value of zero will wait until the thread is finished (see {@link Thread#join(long)}).

* @throws Exception if an error occurs initializing the observer

* @since 2.1

*/

public synchronized void stop(long stopInterval) throws Exception {

if (running == false) {

throw new IllegalStateException("Monitor is not running");

}

running = false;

try {

thread.join(stopInterval);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

for (FileAlterationObserver observer : observers) {

observer.destroy();

}

}

public void run() {

while (running) {

for (FileAlterationObserver observer : observers) {

observer.checkAndNotify();

}

if (!running) {

break;

}

try {

Thread.sleep(interval);

} catch (final InterruptedException ignored) {

}

}

}

}

  • FileAlterationMonitor的start方法会使用自己的runnable创建Thread,然后执行thread.start();其stop方法则执行thread.join(stopInterval),然后遍历observers,执行observer.destroy();其run方法则遍历observers,执行observer.checkAndNotify()

FileAlterationObserver

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationObserver.java

public class FileAlterationObserver implements Serializable {

private final List<FileAlterationListener> listeners = new CopyOnWriteArrayList<FileAlterationListener>();

private final FileEntry rootEntry;

private final FileFilter fileFilter;

private final Comparator<File> comparator;

//......

public void checkAndNotify() {

/* fire onStart() */

for (FileAlterationListener listener : listeners) {

listener.onStart(this);

}

/* fire directory/file events */

File rootFile = rootEntry.getFile();

if (rootFile.exists()) {

checkAndNotify(rootEntry, rootEntry.getChildren(), listFiles(rootFile));

} else if (rootEntry.isExists()) {

checkAndNotify(rootEntry, rootEntry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);

} else {

// Didn"t exist and still doesn"t

}

/* fire onStop() */

for (FileAlterationListener listener : listeners) {

listener.onStop(this);

}

}

private void checkAndNotify(FileEntry parent, FileEntry[] previous, File[] files) {

int c = 0;

FileEntry[] current = files.length > 0 ? new FileEntry[files.length] : FileEntry.EMPTY_ENTRIES;

for (FileEntry entry : previous) {

while (c < files.length && comparator.compare(entry.getFile(), files[c]) > 0) {

current[c] = createFileEntry(parent, files[c]);

doCreate(current[c]);

c++;

}

if (c < files.length && comparator.compare(entry.getFile(), files[c]) == 0) {

doMatch(entry, files[c]);

checkAndNotify(entry, entry.getChildren(), listFiles(files[c]));

current[c] = entry;

c++;

} else {

checkAndNotify(entry, entry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);

doDelete(entry);

}

}

for (; c < files.length; c++) {

current[c] = createFileEntry(parent, files[c]);

doCreate(current[c]);

}

parent.setChildren(current);

}

//......

}

  • FileAlterationObserver的checkAndNotify方法会遍历之前的FileEntry,然后使用NameFileComparator递归遍历对比文件变化,分别触发doCreate、doMatch、doDelete,他们会回调FileAlterationListener的对应方法

小结

ApplicationConfigMonitor在PostConstruct的时候创建FileAlterationObserver,添加FileListener,然后使用FileAlterationObserver创建FileAlterationMonitor,然后执行其start方法;在PreDestroy的时候执行fileMonitor.stop();FileListener继承了FileAlterationListenerAdaptor,其onFileChange方法会检查yml格式,执行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

doc

  • ApplicationConfigMonitor

以上是 聊聊canal的ApplicationConfigMonitor 的全部内容, 来源链接: utcz.com/z/515074.html

回到顶部