flink使用sql实现kafka生产者和消费者 [数据库教程]
1.maven依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.11.2</flink.version>
<logback.version>1.1.7</logback.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<dependencies>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
</dependencies>
2.生产者
import com.g2.flink.models.CustomerStatusChangedEvent;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;importstatic org.apache.flink.table.api.Expressions.$;/*** Hello world!
*///@Slf4j
publicclass KafkaTableStreamApiProducerTest {
publicstaticvoid main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
Long baseTimestamp = 1600855709000L;
DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
);
String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
"customerId int,
" +
"oldStatus int,
" +
"newStatus int,
" +
"eventTime bigint
" +
") WITH(
" +
"‘connector.type‘=‘kafka‘,
" +
"‘connector.version‘=‘universal‘,
" +
"‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,
" +
"‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
"‘format.type‘=‘json‘
" +
")
"
;
tableEnvironment.executeSql(ddl);
while (true) {
try {
TimeUnit.SECONDS.sleep(3);
int status = (int) (System.currentTimeMillis() % 3);
String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +
"values(1001,1," + status + "," + System.currentTimeMillis() + ")";
tableEnvironment.executeSql(insert);
} catch (Exception ex) {
}
}
}
}
3.消费者
import com.g2.flink.models.CustomerStatusChangedEvent;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Hello world!
*///@Slf4j
publicclass KafkaTableStreamApiConsumerTest {
publicstaticvoid main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
Long baseTimestamp = 1600855709000L;
DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
);
String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
"customerId int,
" +
"oldStatus int,
" +
"newStatus int,
" +
"eventTime bigint
" +
") WITH(
" +
"‘connector.type‘=‘kafka‘,
" +
"‘connector.version‘=‘universal‘,
" +
"‘connector.properties.group.id‘=‘g2_group‘,
" +
"‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,
" +
"‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
"‘connector.startup-mode‘ = ‘latest-offset‘,
" +
"‘format.type‘=‘json‘
" +
")
";
tableEnvironment.executeSql(ddl);
Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
" from CustomerStatusChangedEvent" +
" where newStatus in(1,2)"
);
/*
DataStream<Tuple2<Boolean, Tuple2<Integer, Integer>>> result = tableEnvironment.toRetractStream(resultTb,
Types.TUPLE(Types.INT, Types.INT));
*/
DataStream<CustomerStatusLog> result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
result.print();
try {
env.execute();
} catch (Exception ex) {
}
}
publicstaticclass CustomerStatusLog {
private Long customerId;
private Integer status;
public Long getCustomerId() {
return customerId;
}
publicvoid setCustomerId(Long customerId) {
this.customerId = customerId;
}
public Integer getStatus() {
return status;
}
publicvoid setStatus(Integer newStatus) {
this.status = newStatus;
}
public CustomerStatusLog() {
}
@Override
public String toString() {
return "CustomerStatusLog{" +
"customerId=" + customerId +
", status=" + status +
‘}‘;
}
}
}
4.消费者打印
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}
flink 使用sql实现kafka生产者和消费者
以上是 flink使用sql实现kafka生产者和消费者 [数据库教程] 的全部内容, 来源链接: utcz.com/z/535230.html