Apache FlinkCEP 实现超时状态监控的步骤详解

 

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

public

static

<IN, OUT>

SingleOutputStreamOperator

<OUT> createPatternStream(...){...}

public

static

<IN, OUT1, OUT2>

SingleOutputStreamOperator

<OUT1> createTimeoutPatternStream(...){...}

final

SingleOutputStreamOperator

<OUT> patternStream;

SingleOutputStreamOperator

@Public

public

class

SingleOutputStreamOperator

<T>

extends

DataStream

<T> {...}

PatternStream的构造方法:

PatternStream

(

final

DataStream

<T> inputStream,

final

Pattern

<T, ?> pattern) {

this

.inputStream = inputStream;

this

.pattern = pattern;

this

.comparator =

null

;

}

PatternStream

(

final

DataStream

<T> inputStream,

final

Pattern

<T, ?> pattern,

final

EventComparator

<T> comparator) {

this

.inputStream = inputStream;

this

.pattern = pattern;

this

.comparator = comparator;

}

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

public

class

Pattern

<T, F

extends

T> {

/** 模式名称 */

private

final

String

name;

/** 前面一个模式 */

private

final

Pattern

<T, ?

extends

T> previous;

/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */

private

IterativeCondition

<F> condition;

/** 时间窗口长度,在时间长度内进行模式匹配 */

private

Time

windowTime;

/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */

private

Quantifier

quantifier =

Quantifier

.one(

ConsumingStrategy

.STRICT);

/** 停止将事件收集到循环状态时,事件必须满足的条件 */

private

IterativeCondition

<F> untilCondition;

/**

* 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数

*/

private

Times

times;

// 匹配到事件之后的跳过策略

private

final

AfterMatchSkipStrategy

afterMatchSkipStrategy;

...

}

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

public

class

Quantifier

{

...

/**

* 5个属性,可以组合,但并非所有的组合都是有效的

*/

public

enum

QuantifierProperty

{

SINGLE,

LOOPING,

TIMES,

OPTIONAL,

GREEDY

}

/**

* 描述在此模式中匹配哪些事件的策略

*/

public

enum

ConsumingStrategy

{

STRICT,

SKIP_TILL_NEXT,

SKIP_TILL_ANY,

NOT_FOLLOW,

NOT_NEXT

}

/**

* 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到

*/

public

static

class

Times

{

private

final

int

from;

private

final

int

to;

private

Times

(

int

from,

int

to) {

Preconditions

.checkArgument(from >

0

,

"The from should be a positive number greater than 0."

);

Preconditions

.checkArgument(to >= from,

"The to should be a number greater than or equal to from: "

+ from +

"."

);

this

.from = from;

this

.to = to;

}

public

int

getFrom() {

return

from;

}

public

int

getTo() {

return

to;

}

// 次数范围

public

static

Times

of(

int

from,

int

to) {

return

new

Times

(from, to);

}

// 指定具体次数

public

static

Times

of(

int

times) {

return

new

Times

(times, times);

}

@Override

public

boolean

equals(

Object

o) {

if

(

this

== o) {

return

true

;

}

if

(o ==

null

|| getClass() != o.getClass()) {

return

false

;

}

Times

times = (

Times

) o;

return

from == times.from &&

to == times.to;

}

@Override

public

int

hashCode() {

return

Objects

.hash(from, to);

}

}

...

}

EventComparator,自定义事件比较器,实现EventComparator接口。

public

interface

EventComparator

<T>

extends

Comparator

<T>,

Serializable

{

long

serialVersionUID =

1L

;

}

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

public

class

NFACompiler

{

...

/**

* NFAFactory 创建NFA的接口

*

* @param <T> Type of the input events which are processed by the NFA

*/

public

interface

NFAFactory

<T>

extends

Serializable

{

NFA<T> createNFA();

}

/**

* NFAFactory的具体实现NFAFactoryImpl

*

* <p>The implementation takes the input type serializer, the window time and the set of

* states and their transitions to be able to create an NFA from them.

*

* @param <T> Type of the input events which are processed by the NFA

*/

private

static

class

NFAFactoryImpl

<T>

implements

NFAFactory

<T> {

private

static

final

long

serialVersionUID =

8939783698296714379L

;

private

final

long

windowTime;

private

final

Collection

<

State

<T>> states;

private

final

boolean

timeoutHandling;

private

NFAFactoryImpl

(

long

windowTime,

Collection

<

State

<T>> states,

boolean

timeoutHandling) {

this

.windowTime = windowTime;

this

.states = states;

this

.timeoutHandling = timeoutHandling;

}

@Override

public

NFA<T> createNFA() {

// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成

return

new

NFA<>(states, windowTime, timeoutHandling);

}

}

}

NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

https://zh.wikipedia.org/wiki/非确定有限状态自动机

public

class

NFA<T> {

/**

* NFACompiler返回的所有有效的NFA状态集合

* These are directly derived from the user-specified pattern.

*/

private

final

Map

<

String

,

State

<T>> states;

/**

* Pattern.within(Time)指定的时间窗口长度

*/

private

final

long

windowTime;

/**

* 一个超时匹配的标记

*/

private

final

boolean

handleTimeout;

...

}

 

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

public

interface

PatternSelectFunction

<IN, OUT>

extends

Function

,

Serializable

{

/**

* 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识

*/

OUT select(

Map

<

String

,

List

<IN>> pattern)

throws

Exception

;

}

 

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

public

interface

PatternFlatSelectFunction

<IN, OUT>

extends

Function

,

Serializable

{

/**

* 生成一个或多个结果

*/

void

flatSelect(

Map

<

String

,

List

<IN>> pattern,

Collector

<OUT> out)

throws

Exception

;

}

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

public

class

SelectTimeoutCepOperator

<IN, OUT1, OUT2, KEY>

extends

AbstractKeyedCEPPatternOperator

<IN, KEY, OUT1,

SelectTimeoutCepOperator

.

SelectWrapper

<IN, OUT1, OUT2>> {

private

OutputTag

<OUT2> timedOutOutputTag;

public

SelectTimeoutCepOperator

(

TypeSerializer

<IN> inputSerializer,

boolean

isProcessingTime,

NFACompiler

.

NFAFactory

<IN> nfaFactory,

final

EventComparator

<IN> comparator,

AfterMatchSkipStrategy

skipStrategy,

// 参数命名混淆了flat...包括SelectWrapper类中的成员命名...

PatternSelectFunction

<IN, OUT1> flatSelectFunction,

PatternTimeoutFunction

<IN, OUT2> flatTimeoutFunction,

OutputTag

<OUT2> outputTag,

OutputTag

<IN> lateDataOutputTag) {

super

(

inputSerializer,

isProcessingTime,

nfaFactory,

comparator,

skipStrategy,

new

SelectWrapper

<>(flatSelectFunction, flatTimeoutFunction),

lateDataOutputTag);

this

.timedOutOutputTag = outputTag;

}

...

}

public

interface

PatternTimeoutFunction

<IN, OUT>

extends

Function

,

Serializable

{

OUT timeout(

Map

<

String

,

List

<IN>> pattern,

long

timeoutTimestamp)

throws

Exception

;

}

public

interface

PatternFlatTimeoutFunction

<IN, OUT>

extends

Function

,

Serializable

{

void

timeout(

Map

<

String

,

List

<IN>> pattern,

long

timeoutTimestamp,

Collector

<OUT> out)

throws

Exception

;

}

 

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

public

class

CEP {

public

static

<T>

PatternStream

<T> pattern(

DataStream

<T> input,

Pattern

<T, ?> pattern) {

return

new

PatternStream

<>(input, pattern);

}

public

static

<T>

PatternStream

<T> pattern(

DataStream

<T> input,

Pattern

<T, ?> pattern,

EventComparator

<T> comparator) {

return

new

PatternStream

<>(input, pattern, comparator);

}

}

 

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

public

class

CEPOperatorUtils

{

...

private

static

<IN, OUT, K>

SingleOutputStreamOperator

<OUT> createPatternStream(

final

DataStream

<IN> inputStream,

final

Pattern

<IN, ?> pattern,

final

TypeInformation

<OUT> outTypeInfo,

final

boolean

timeoutHandling,

final

EventComparator

<IN> comparator,

final

OperatorBuilder

<IN, OUT> operatorBuilder) {

final

TypeSerializer

<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time

final

boolean

isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==

TimeCharacteristic

.

ProcessingTime

;

// compile our pattern into a NFAFactory to instantiate NFAs later on

final

NFACompiler

.

NFAFactory

<IN> nfaFactory =

NFACompiler

.compileFactory(pattern, timeoutHandling);

final

SingleOutputStreamOperator

<OUT> patternStream;

if

(inputStream

instanceof

KeyedStream

) {

KeyedStream

<IN, K> keyedStream = (

KeyedStream

<IN, K>) inputStream;

patternStream = keyedStream.transform(

operatorBuilder.getKeyedOperatorName(),

outTypeInfo,

operatorBuilder.build(

inputSerializer,

isProcessingTime,

nfaFactory,

comparator,

pattern.getAfterMatchSkipStrategy()));

}

else

{

KeySelector

<IN,

Byte

> keySelector =

new

NullByteKeySelector

<>();

patternStream = inputStream.keyBy(keySelector).transform(

operatorBuilder.getOperatorName(),

outTypeInfo,

operatorBuilder.build(

inputSerializer,

isProcessingTime,

nfaFactory,

comparator,

pattern.getAfterMatchSkipStrategy()

)).forceNonParallel();

}

return

patternStream;

}

...

}

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where...times...
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

KeySelector

<IN,

Byte

> keySelector =

new

NullByteKeySelector

<>();

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(...)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

public

class

CEPTimeoutEventJob

{

private

static

final

String

LOCAL_KAFKA_BROKER =

"localhost:9092"

;

private

static

final

String

GROUP_ID =

CEPTimeoutEventJob

.

class

.getSimpleName();

private

static

final

String

GROUP_TOPIC = GROUP_ID;

public

static

void

main(

String

[] args)

throws

Exception

{

// 参数

ParameterTool

params =

ParameterTool

.fromArgs(args);

StreamExecutionEnvironment

env =

StreamExecutionEnvironment

.getExecutionEnvironment();

// 使用事件时间

env.setStreamTimeCharacteristic(

TimeCharacteristic

.

EventTime

);

env.enableCheckpointing(

5000

);

env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig

.

ExternalizedCheckpointCleanup

.RETAIN_ON_CANCELLATION);

env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(

RestartStrategies

.fixedDelayRestart(

5

,

10000

));

// 不使用POJO的时间

final

AssignerWithPeriodicWatermarks

extractor =

new

IngestionTimeExtractor

<POJO>();

// 与Kafka Topic的Partition保持一致

env.setParallelism(

3

);

Properties

kafkaProps =

new

Properties

();

kafkaProps.setProperty(

"bootstrap.servers"

, LOCAL_KAFKA_BROKER);

kafkaProps.setProperty(

"group.id"

, GROUP_ID);

// 接入Kafka的消息

FlinkKafkaConsumer011

<POJO> consumer =

new

FlinkKafkaConsumer011

<>(GROUP_TOPIC,

new

POJOSchema

(), kafkaProps);

DataStream

<POJO> pojoDataStream = env.addSource(consumer)

.assignTimestampsAndWatermarks(extractor);

pojoDataStream.print();

// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】

// 1.

DataStream

<POJO> keyedPojos = pojoDataStream

.keyBy(

"aid"

);

// 从初始化到终态-一个完整的POJO事件序列

// 2.

Pattern

<POJO, POJO> completedPojo =

Pattern

.<POJO>begin(

"init"

)

.where(

new

SimpleCondition

<POJO>() {

private

static

final

long

serialVersionUID = -

6847788055093903603L

;

@Override

public

boolean

filter(POJO pojo)

throws

Exception

{

return

"02"

.equals(pojo.getAstatus());

}

})

.followedBy(

"end"

)

// .next("end")

.where(

new

SimpleCondition

<POJO>() {

private

static

final

long

serialVersionUID = -

2655089736460847552L

;

@Override

public

boolean

filter(POJO pojo)

throws

Exception

{

return

"00"

.equals(pojo.getAstatus()) ||

"01"

.equals(pojo.getAstatus());

}

});

// 找出1分钟内【便于测试】都没有到终态的事件aid

// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream

// 3.

PatternStream

<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(

Time

.minutes(

1

)));

// 定义侧面输出timedout

// 4.

OutputTag

<POJO> timedout =

new

OutputTag

<POJO>(

"timedout"

) {

private

static

final

long

serialVersionUID =

773503794597666247L

;

};

// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction

// 5.

SingleOutputStreamOperator

<POJO> timeoutPojos = patternStream.flatSelect(

timedout,

new

POJOTimedOut

(),

new

FlatSelectNothing

()

);

// 打印输出超时的POJO

// 6.7.

timeoutPojos.getSideOutput(timedout).print();

timeoutPojos.print();

env.execute(

CEPTimeoutEventJob

.

class

.getSimpleName());

}

/**

* 把超时的事件收集起来

*/

public

static

class

POJOTimedOut

implements

PatternFlatTimeoutFunction

<POJO, POJO> {

private

static

final

long

serialVersionUID = -

4214641891396057732L

;

@Override

public

void

timeout(

Map

<

String

,

List

<POJO>> map,

long

l,

Collector

<POJO> collector)

throws

Exception

{

if

(

null

!= map.get(

"init"

)) {

for

(POJO pojoInit : map.get(

"init"

)) {

System

.out.println(

"timeout init:"

+ pojoInit.getAid());

collector.collect(pojoInit);

}

}

// 因为end超时了,还没收到end,所以这里是拿不到end的

System

.out.println(

"timeout end: "

+ map.get(

"end"

));

}

}

/**

* 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了

* 一分钟时间内走完init和end的数据

*

* @param <T>

*/

public

static

class

FlatSelectNothing

<T>

implements

PatternFlatSelectFunction

<T, T> {

private

static

final

long

serialVersionUID = -

3029589950677623844L

;

@Override

public

void

flatSelect(

Map

<

String

,

List

<T>> pattern,

Collector

<T> collector) {

System

.out.println(

"flatSelect: "

+ pattern);

}

}

}

测试结果(followedBy):

3

> POJO{aid=

'ID000-0'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419728242

, energy=

529.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-1'

, astyle=

'STYLE000-2'

, aname=

'NAME-1'

, logTime=

1563419728783

, energy=

348.00

, age=

26

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-0'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419749259

, energy=

492.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'00'

, createTime=

null

, updateTime=

null

}

flatSelect: {init=[POJO{aid=

'ID000-0'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419728242

, energy=

529.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}],

end

=[POJO{aid=

'ID000-0'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419749259

, energy=

492.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'00'

, createTime=

null

, updateTime=

null

}]}

timeout init:ID000-

1

3

> POJO{aid=

'ID000-1'

, astyle=

'STYLE000-2'

, aname=

'NAME-1'

, logTime=

1563419728783

, energy=

348.00

, age=

26

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}

timeout

end

:

null

3

> POJO{aid=

'ID000-2'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419829639

, energy=

467.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'03'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-2'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419841394

, energy=

107.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'00'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-3'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419967721

, energy=

431.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-3'

, astyle=

'STYLE000-2'

, aname=

'NAME-0'

, logTime=

1563419979567

, energy=

32.00

, age=

26

, tt=

2019

-

07

-

18

, astatus=

'03'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-3'

, astyle=

'STYLE000-2'

, aname=

'NAME-0'

, logTime=

1563419993612

, energy=

542.00

, age=

26

, tt=

2019

-

07

-

18

, astatus=

'01'

, createTime=

null

, updateTime=

null

}

flatSelect: {init=[POJO{aid=

'ID000-3'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563419967721

, energy=

431.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}],

end

=[POJO{aid=

'ID000-3'

, astyle=

'STYLE000-2'

, aname=

'NAME-0'

, logTime=

1563419993612

, energy=

542.00

, age=

26

, tt=

2019

-

07

-

18

, astatus=

'01'

, createTime=

null

, updateTime=

null

}]}

3

> POJO{aid=

'ID000-4'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563420063760

, energy=

122.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}

3

> POJO{aid=

'ID000-4'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563420078008

, energy=

275.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'03'

, createTime=

null

, updateTime=

null

}

timeout init:ID000-

4

3

> POJO{aid=

'ID000-4'

, astyle=

'STYLE000-0'

, aname=

'NAME-0'

, logTime=

1563420063760

, energy=

122.00

, age=

0

, tt=

2019

-

07

-

18

, astatus=

'02'

, createTime=

null

, updateTime=

null

}

timeout

end

:

null

总结

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

以上是 Apache FlinkCEP 实现超时状态监控的步骤详解 的全部内容, 来源链接: utcz.com/p/226619.html

回到顶部