flink使用sql实现kafka生产者和消费者 [数据库教程]

database

1.maven依赖

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<maven.compiler.source>1.8</maven.compiler.source>

<maven.compiler.target>1.8</maven.compiler.target>

<flink.version>1.11.2</flink.version>

<logback.version>1.1.7</logback.version>

<slf4j.version>1.7.25</slf4j.version>

</properties>

<dependencies>

<dependency>

<!-- Used by maven-dependency-plugin -->

<groupId>org.apache.flink</groupId>

<artifactId>flink-json</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-wikiedits_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-api-java-bridge_2.12</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-api-java</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

<version>${slf4j.version}</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>${logback.version}</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>${logback.version}</version>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<version>1.16.18</version>

</dependency>

</dependencies>

2.生产者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.TimeUnit;

importstatic org.apache.flink.table.api.Expressions.$;

/**

* Hello world!

*/

//@Slf4j

publicclass KafkaTableStreamApiProducerTest {

publicstaticvoid main(String[] args) {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.inStreamingMode()

//.useOldPlanner() // flink

.useBlinkPlanner() // blink

.build();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

Long baseTimestamp = 1600855709000L;

DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(

new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),

new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),

new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),

new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)

);

String ddl = "CREATE TABLE CustomerStatusChangedEvent(

" +

"customerId int,

" +

"oldStatus int,

" +

"newStatus int,

" +

"eventTime bigint

" +

") WITH(

" +

"‘connector.type‘=‘kafka‘,

" +

"‘connector.version‘=‘universal‘,

" +

"‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,

" +

"‘connector.topic‘=‘customer_statusChangedEvent‘,

" +

"‘format.type‘=‘json‘

" +

")

"

;

tableEnvironment.executeSql(ddl);

while (true) {

try {

TimeUnit.SECONDS.sleep(3);

int status = (int) (System.currentTimeMillis() % 3);

String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +

"values(1001,1," + status + "," + System.currentTimeMillis() + ")";

tableEnvironment.executeSql(insert);

} catch (Exception ex) {

}

}

}

}

 

3.消费者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

* Hello world!

*/

//@Slf4j

publicclass KafkaTableStreamApiConsumerTest {

publicstaticvoid main(String[] args) {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.inStreamingMode()

//.useOldPlanner() // flink

.useBlinkPlanner() // blink

.build();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

Long baseTimestamp = 1600855709000L;

DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(

new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),

new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),

new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),

new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)

);

String ddl = "CREATE TABLE CustomerStatusChangedEvent(

" +

"customerId int,

" +

"oldStatus int,

" +

"newStatus int,

" +

"eventTime bigint

" +

") WITH(

" +

"‘connector.type‘=‘kafka‘,

" +

"‘connector.version‘=‘universal‘,

" +

"‘connector.properties.group.id‘=‘g2_group‘,

" +

"‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,

" +

"‘connector.topic‘=‘customer_statusChangedEvent‘,

" +

"‘connector.startup-mode‘ = ‘latest-offset‘,

" +

"‘format.type‘=‘json‘

" +

")

";

tableEnvironment.executeSql(ddl);

Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +

" from CustomerStatusChangedEvent" +

" where newStatus in(1,2)"

);

/*

DataStream<Tuple2<Boolean, Tuple2<Integer, Integer>>> result = tableEnvironment.toRetractStream(resultTb,

Types.TUPLE(Types.INT, Types.INT));

*/

DataStream<CustomerStatusLog> result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);

result.print();

try {

env.execute();

} catch (Exception ex) {

}

}

publicstaticclass CustomerStatusLog {

private Long customerId;

private Integer status;

public Long getCustomerId() {

return customerId;

}

publicvoid setCustomerId(Long customerId) {

this.customerId = customerId;

}

public Integer getStatus() {

return status;

}

publicvoid setStatus(Integer newStatus) {

this.status = newStatus;

}

public CustomerStatusLog() {

}

@Override

public String toString() {

return "CustomerStatusLog{" +

"customerId=" + customerId +

", status=" + status +

‘}‘;

}

}

}

 

4.消费者打印

4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}

flink 使用sql实现kafka生产者和消费者

以上是 flink使用sql实现kafka生产者和消费者 [数据库教程] 的全部内容, 来源链接: utcz.com/z/535230.html

回到顶部