




public interface ElapsedTimeStrategy {


* Determine if the time period has elapsed since this method was last called.


* @return {@code true} if this invocation caused the thread to sleep, or {@code false} if this method did not sleep


boolean hasElapsed();


  • ElapsedTimeStrategy定义了hasElapsed方法



	public static ElapsedTimeStrategy none() {

return () -> true;


  • none方法针对hasElapsed始终返回true



    public static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) {

if (delayInMilliseconds <= 0) {

throw new IllegalArgumentException("Initial delay must be positive");


return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;


public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;

return true;


long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

long multiple = 1 + (current - nextTimestamp) / delayInMilliseconds;

nextTimestamp += multiple * delayInMilliseconds;

} while (current > nextTimestamp);

return true;


return false;




  • constant接收clock及delayInMilliseconds参数,其hasElapsed方法通过delayInMilliseconds计算nextTimestamp,在current小于nextTimestamp时返回false,否则更新nextTimestamp并返回true



    public static ElapsedTimeStrategy step(Clock clock,

long preStepDelayInMilliseconds,

BooleanSupplier stepFunction,

long postStepDelayInMilliseconds) {

if (preStepDelayInMilliseconds <= 0) {

throw new IllegalArgumentException("Pre-step delay must be positive");


if (postStepDelayInMilliseconds <= 0) {

throw new IllegalArgumentException("Post-step delay must be positive");


return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

private boolean elapsed = false;

private long delta = 0L;


public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

elapsed = stepFunction.getAsBoolean();

delta = elapsed ? postStepDelayInMilliseconds : preStepDelayInMilliseconds;

nextTimestamp = clock.currentTimeInMillis() + delta;

return true;


if (!elapsed) {

elapsed = stepFunction.getAsBoolean();

if (elapsed) {

delta = postStepDelayInMilliseconds;



long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

assert delta > 0;

long multiple = 1 + (current - nextTimestamp) / delta;

nextTimestamp += multiple * delta;

} while (nextTimestamp <= current);

return true;


return false;




  • step接收stepFunction方法,其hasElapsed方法的初始nextTimestamp为clock.currentTimeInMillis() + delta,在elapsed为false时通过stepFunction设置elapsed,如果为true则更新delta为postStepDelayInMilliseconds,之后在current小于nextTimestamp返回false,否则更新nextTimestamp,返回true



    public static ElapsedTimeStrategy linear(Clock clock, long delayInMilliseconds) {

if (delayInMilliseconds <= 0) {

throw new IllegalArgumentException("Initial delay must be positive");


return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

private long counter = 1L;


public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;

counter = 1L;

return true;


long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

if (counter < Long.MAX_VALUE) {



nextTimestamp += (delayInMilliseconds * counter);

} while (nextTimestamp <= current);

return true;


return false;




  • linear通过delayInMilliseconds * counter来递增nextTimestamp



    public static ElapsedTimeStrategy exponential(Clock clock,

long initialDelayInMilliseconds,

long maxDelayInMilliseconds,

double multiplier) {

if (multiplier <= 1.0) {

throw new IllegalArgumentException("Multiplier must be greater than 1");


if (initialDelayInMilliseconds <= 0) {

throw new IllegalArgumentException("Initial delay must be positive");


if (initialDelayInMilliseconds >= maxDelayInMilliseconds) {

throw new IllegalArgumentException("Maximum delay must be greater than initial delay");


return new ElapsedTimeStrategy() {

private long nextTimestamp = 0L;

private long previousDelay = 0L;


public boolean hasElapsed() {

if (nextTimestamp == 0L) {

// Initialize ...

nextTimestamp = clock.currentTimeInMillis() + initialDelayInMilliseconds;

previousDelay = initialDelayInMilliseconds;

return true;


long current = clock.currentTimeInMillis();

if (current >= nextTimestamp) {

do {

// Compute how long to delay ...

long nextDelay = (long) (previousDelay * multiplier);

if (nextDelay >= maxDelayInMilliseconds) {

previousDelay = maxDelayInMilliseconds;

// If we"re not there yet, then we know the increment is linear from here ...

if (nextTimestamp < current) {

long multiple = 1 + (current - nextTimestamp) / maxDelayInMilliseconds;

nextTimestamp += multiple * maxDelayInMilliseconds;



else {

previousDelay = nextDelay;


nextTimestamp += previousDelay;

} while (nextTimestamp <= current);

return true;


return false;




  • exponential通过multiple * maxDelayInMilliseconds来递增nextTimestamp




  • ElapsedTimeStrategy

以上是 聊聊debezium的ElapsedTimeStrategy 的全部内容, 来源链接: utcz.com/z/516687.html
