聊聊debezium的ElapsedTimeStrategy

编程

ElapsedTimeStrategy

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

@FunctionalInterface

public interface ElapsedTimeStrategy {

/**

* Determine if the time period has elapsed since this method was last called.

*

* @return {@code true} if this invocation caused the thread to sleep, or {@code false} if this method did not sleep

*/

boolean hasElapsed();

}

  • ElapsedTimeStrategy定义了hasElapsed方法

none

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

	public static ElapsedTimeStrategy none() {

return () -> true;

}

  • none方法针对hasElapsed始终返回true

constant

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

    public static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) {

if (delayInMilliseconds <= 0) {

throw new IllegalArgumentException("Initial delay must be positive");

}

return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

@Override

public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;

return true;

}

long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

long multiple = 1 + (current - nextTimestamp) / delayInMilliseconds;

nextTimestamp += multiple * delayInMilliseconds;

} while (current > nextTimestamp);

return true;

}

return false;

}

};

}

  • constant接收clock及delayInMilliseconds参数,其hasElapsed方法通过delayInMilliseconds计算nextTimestamp,在current小于nextTimestamp时返回false,否则更新nextTimestamp并返回true

step

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

    public static ElapsedTimeStrategy step(Clock clock,

long preStepDelayInMilliseconds,

BooleanSupplier stepFunction,

long postStepDelayInMilliseconds) {

if (preStepDelayInMilliseconds <= 0) {

throw new IllegalArgumentException("Pre-step delay must be positive");

}

if (postStepDelayInMilliseconds <= 0) {

throw new IllegalArgumentException("Post-step delay must be positive");

}

return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

private boolean elapsed = false;

private long delta = 0L;

@Override

public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

elapsed = stepFunction.getAsBoolean();

delta = elapsed ? postStepDelayInMilliseconds : preStepDelayInMilliseconds;

nextTimestamp = clock.currentTimeInMillis() + delta;

return true;

}

if (!elapsed) {

elapsed = stepFunction.getAsBoolean();

if (elapsed) {

delta = postStepDelayInMilliseconds;

}

}

long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

assert delta > 0;

long multiple = 1 + (current - nextTimestamp) / delta;

nextTimestamp += multiple * delta;

} while (nextTimestamp <= current);

return true;

}

return false;

}

};

}

  • step接收stepFunction方法,其hasElapsed方法的初始nextTimestamp为clock.currentTimeInMillis() + delta,在elapsed为false时通过stepFunction设置elapsed,如果为true则更新delta为postStepDelayInMilliseconds,之后在current小于nextTimestamp返回false,否则更新nextTimestamp,返回true

linear

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

    public static ElapsedTimeStrategy linear(Clock clock, long delayInMilliseconds) {

if (delayInMilliseconds <= 0) {

throw new IllegalArgumentException("Initial delay must be positive");

}

return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

private long counter = 1L;

@Override

public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;

counter = 1L;

return true;

}

long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

if (counter < Long.MAX_VALUE) {

++counter;

}

nextTimestamp += (delayInMilliseconds * counter);

} while (nextTimestamp <= current);

return true;

}

return false;

}

};

}

  • linear通过delayInMilliseconds * counter来递增nextTimestamp

exponential

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

    public static ElapsedTimeStrategy exponential(Clock clock,

long initialDelayInMilliseconds,

long maxDelayInMilliseconds,

double multiplier) {

if (multiplier <= 1.0) {

throw new IllegalArgumentException("Multiplier must be greater than 1");

}

if (initialDelayInMilliseconds <= 0) {

throw new IllegalArgumentException("Initial delay must be positive");

}

if (initialDelayInMilliseconds >= maxDelayInMilliseconds) {

throw new IllegalArgumentException("Maximum delay must be greater than initial delay");

}

return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

private long previousDelay = 0L;

@Override

public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

nextTimestamp = clock.currentTimeInMillis() + initialDelayInMilliseconds;

previousDelay = initialDelayInMilliseconds;

return true;

}

long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

// Compute how long to delay ...

long nextDelay = (long) (previousDelay * multiplier);

if (nextDelay >= maxDelayInMilliseconds) {

previousDelay = maxDelayInMilliseconds;

// If we"re not there yet, then we know the increment is linear from here ...

if (nextTimestamp < current) {

long multiple = 1 + (current - nextTimestamp) / maxDelayInMilliseconds;

nextTimestamp += multiple * maxDelayInMilliseconds;

}

}

else {

previousDelay = nextDelay;

}

nextTimestamp += previousDelay;

} while (nextTimestamp <= current);

return true;

}

return false;

}

};

}

  • exponential通过multiple * maxDelayInMilliseconds来递增nextTimestamp

小结

ElapsedTimeStrategy定义了hasElapsed方法,它提供了none、constant、step、linear、exponential这几种实现

doc

  • ElapsedTimeStrategy

以上是 聊聊debezium的ElapsedTimeStrategy 的全部内容, 来源链接: utcz.com/z/516687.html

回到顶部