【Java】Kafka Stream 简介及基本运用(含案例)

目录

  • 1、Kafka Stream背景
    • 1.1 Kafka Stream是什么
    • 1.2 什么是流式计算
    • 1.3 为什么要有Kafka Stream
  • 2、Kafka Stream如何解决流式系统中关键问题
    • 2.1 KTable和KSteam
    • 2.2 时间
    • 2.3 窗口
  • 3、Kafka Stream应用示例
    • 3.1 案例一:将topicA的数据写入到topicB中(纯复制)
    • 3.2 案例二:将TopicA中的数据实现wordcount写入到TopicB
    • 3.3 示例三:在TopicA中每输入一个值求和并写入到TopicB
    • 3.4 案例四:窗口
      • 3.4.1 每隔2秒钟输出一次过去5秒内topicA里的wordcount,结果写入到TopicB
      • 3.4.2 每隔5秒钟输出一次过去5秒内topicA里的wordcount,结果写入到TopicB
      • 3.4.3 TopicA 15秒内的wordcount,结果写入TopicB
    • 3.5 案例五:将TopicA的某一列扁平化处理写入TopicB
    • 3.6 案例六:将TopicA的多列扁平化处理写入TopicB

学习Kafka Stream,我们需要先了解什么是Kafka Stream ,为什么要使用Kafka Stream,以及我们怎么使用Kafka Stream。

在此推荐Kafka Stream学习博客:https://www.cnblogs.com/warehouse/p/9521382.html

1、Kafka Stream背景

1.1 Kafka Stream是什么

Kafka Streams是一套客户端类库,它可以对存储在Kafka内的数据进行流式处理和分析。

1.2 什么是流式计算

  • 流式计算:输入是持续的,一般先定义目标计算,然后数据到来之后将计算逻辑应用于数据,往往用增量计算代替全量计算。
  • 批量计算:一般先有全量数据集,然后定义计算逻辑,并将计算应用于全量数据。特点是全量计算,并且计算结果一次性全量输出。

1.3 为什么要有Kafka Stream

开源流式处理系统有:Spark Streaming和Apache Storm,它们能与SQL处理集成等优点,功能强大,那为何还需要Kafka Stream呢?

1、使用方便。Spark和Storm都是流式处理框架,而Kafka Stream是基于Kafka的流式处理类库。开发者很难了解框架的具体运行方式,调试成本高,使用受限。而类库直接提供具体的类给开发者使用,整个应用的运行方式主要由开发者控制,方便使用和调试。

2、使用成本低。就流式处理系统而言,基本都支持Kafka作为数据源。Kafka基本上是主流的流式处理系统的标准数据源。大部分流式系统中都部署了Kafka,包括Spark和Storm,此时使用Kafka Stream的成本非常低。

3、省资源。使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,框架本身也占资源。

4、Kafka本身也有优点。由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并发度。

2、Kafka Stream如何解决流式系统中关键问题

2.1 KTable和KSteam

KTable和KSteam是Kafka中非常重要的概念,在此分析一下二者区别。

  • KStream是一个数据流,可以认为所有的记录都通过Insert only的方式插入进这个数据流中。
  • KTable代表一个完整的数据集,可以理解为数据库中的表。每条记录都是KV键值对,key可以理解为数据库中的主键,是唯一的,而value代表一条记录。我们可以认为KTable中的数据时通过Update only的方式进入的。如果是相同的key,会覆盖掉原来的那条记录。
  • 综上来说,KStream是数据流,来多少数据就插入多少数据,是Insert only;KTable是数据集,相同key只允许保留最新的记录,也就是Update only

2.2 时间

Kafka支持三种时间:

  • 事件发生时间:事件发生的时间,包含在数据记录中。发生时间由Producer在构造ProducerRecord时指定。并且需要Broker或者Topic将message.timestamp.type设置为CreateTime(默认值)才能生效。
  • 消息接收时间:也即消息存入Broker的时间。当Broker或Topic将message.timestamp.type设置为LogAppendTime时生效。此时Broker会在接收到消息后,存入磁盘前,将其timestamp属性值设置为当前机器时间。一般消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。
  • 消息处理时间。也即Kafka Stream处理消息时的时间。

2.3 窗口

流式数据在时间上无界的,但是聚合操作只能作用在特定(有界)的数据集,咋整???这时候就有了窗口的概念,在时间无界的数据流中定义一个边界来用于计算。Kafka支持的窗口如下:

  • 1)Hopping Time Window:举一个典型的应用场景,每隔5秒钟输出一次过去1个小时内网站的PV或者UV。里面有两个时间1小时和5秒钟,1小时指定了窗口的大小(Window size),5秒钟定义输出的时间间隔(Advance interval)。
  • 2)Tumbling Time Window:可以认为是Hopping Time Window的一种特例,窗口大小=输出时间间隔,它的特点是各个Window之间完全不相交。
  • 3)Sliding Window该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,可以进行Join计算。
  • 4)Session Window该窗口用于对Key做Group后的聚合操作中。它需要对Key做分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。

3、Kafka Stream应用示例

添加pom依赖:

`<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.11</artifactId>

<version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-streams</artifactId>

<version>2.0.0</version>

</dependency>`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

3.1 案例一:将topicA的数据写入到topicB中(纯复制)

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class MyStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //输入key的类型

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //输入value的类型

//创建流构造器

StreamsBuilder builder = new StreamsBuilder();

//构建好builder,将myStreamIn topic中的数据写入到myStreamOut topic中

builder.stream("myStreamIn").to("myStreamOut");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

在这里说明一下prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream"); ,我们将TopicA的数据写入到TopicB中,就相当于这个流在消费TopicA的数据,我们知道一个Topic中,一个消费组里只能有一个消费者去消费它。假设我们将TopicA的数据写入到TopicB的过程中,报错了(比如虚拟机内存满了),这时数据只写了一半,我们清理完内存后,想继续写剩下的数据,再次运行我们发现报错,写不了了。这时候我们需要修改这个参数将mystream改成别的名字,因为同一个消费者组里只能有一个消费者去消费它。

开启zookeeper和Kafka

`# 开启zookeeper

zkServer.sh start

# 后台启动Kafka

kafka-server-start.sh -daemon /opt/kafka/config/server.properties`

* 1

* 2

* 3

* 4

创建topic myStreamIn

`kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamIn --partitions 1 --replication-factor 1`

* 1

创建topic myStreamOut

`kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamOut --partitions 1 --replication-factor 1`

* 1

生产消息写入到myStreamIn

`kafka-console-producer.sh --topic myStreamIn --broker-list 192.168.136.20:9092`

* 1

消费myStreamOut里的数据

`kafka-console-consumer.sh --topic myStreamOut --bootstrap-server 192.168.136.20:9092 --from-beginning`

* 1

运行示例代码并在生产者端输入数据,能在消费端看到数据,表明Kafka Stream写入成功。

3.2 案例二:将TopicA中的数据实现wordcount写入到TopicB

工作中不可能像案例一一样将一个Topic的数据原封不动存入另一个Topic,一般是要经过处理,这就需要在流中加上逻辑。

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class WordCountStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcountstream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); //提交时间设置为2秒

//prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,""earliest ); //earliest latest none 默认latest

//prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(自动提交) false(手动提交)

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

//创建流构造器

//hello world

//hello java

StreamsBuilder builder = new StreamsBuilder();

KTable<String, Long> count = builder.stream("wordcount-input") //从kafka中一条一条取数据

.flatMapValues( //返回压扁后的数据

(value) -> { //对数据按空格进行切割,返回List集合

String[] split = value.toString().split(" ");

List<String> strings = Arrays.asList(split);

return strings;

}) //null hello,null world,null hello,null java

.map((k, v) -> {

return new KeyValue<String, String>(v,"1");

}).groupByKey().count();

count.toStream().foreach((k,v)->{

//为了测试方便,我们将kv输出到控制台

System.out.println("key:"+k+" "+"value:"+v);

});

count.toStream().map((x,y)->{

return new KeyValue<String,String>(x,y.toString()); //注意转成toString类型,我们前面设置的kv的类型都是string类型

}).to("wordcount-output");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

* 44

* 45

* 46

* 47

* 48

* 49

* 50

* 51

* 52

* 53

* 54

* 55

* 56

* 57

* 58

* 59

* 60

* 61

* 62

* 63

`# 创建TopicA(wordcount-input)

kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic wordcount-input --partitions 1 --replication-factor 1

# 创建TopicB(wordcount-output)

kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic wordcount-output --partitions 1 --replication-factor 1

# 创建生产者

kafka-console-producer.sh --topic wordcount-input --broker-list 192.168.136.20:9092

# 创建消费者,需要打印出key(--property print.key=true)

kafka-console-consumer.sh --topic wordcount-output --bootstrap-server 192.168.136.20:9092 --from-beginning --property print.key=true`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

运行示例代码并在生产者端输入数据,能在消费端看到数据,表明Kafka Stream写入成功。

3.3 示例三:在TopicA中每输入一个值求和并写入到TopicB

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.KTable;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class SumStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sumstream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); //提交时间设置为2秒

//prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //earliest latest none 默认latest

//prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(自动提交) false(手动提交)

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

//创建流构造器

StreamsBuilder builder = new StreamsBuilder();

KStream<Object, Object> source = builder.stream("suminput");

KTable<String, String> sum1 = source.map((key, value) ->

new KeyValue<String, String>("sum", value.toString())

).groupByKey().reduce((x, y) -> {

System.out.println("x: " + x + " " + "y: "+y);

Integer sum = Integer.valueOf(x) + Integer.valueOf(y);

System.out.println("sum: "+sum);

return sum.toString();

});

sum1.toStream().to("sumout");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

* 44

* 45

* 46

* 47

* 48

* 49

* 50

* 51

* 52

`# 创建topicA(suminput)

kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic suminput --partitions 1 --replication-factor 1

# 创建生产者

kafka-console-producer.sh --topic suminput --broker-list 192.168.136.20:9092

# 创建topicB(sumout)

kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic sumout --partitions 1 --replication-factor 1

# 创建消费者

kafka-console-consumer.sh --topic sumout --bootstrap-server 192.168.136.20:9092 --from-beginning`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

运行示例代码并在生产者端输入数据,能在消费端看到数据,表明Kafka Stream写入成功。

3.4 案例四:窗口

3.4.1 每隔2秒钟输出一次过去5秒内topicA里的wordcount,结果写入到TopicB

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class WindowStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交时间设置为3秒

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream<Object, Object> source = builder.stream("topicA");

KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))

.map((x, y) -> {

return new KeyValue<String, String>(y, "1");

}).groupByKey()

//加5秒窗口,按步长2秒滑动 Hopping Time Window

.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()).advanceBy(Duration.ofSeconds(2).toMillis()))

//.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))

.count();

//为了方便查看,输出到控制台

countKtable.toStream().foreach((x,y)->{

System.out.println("x: "+x+" y: "+y);

});

countKtable.toStream().map((x,y)-> {

return new KeyValue<String, String>(x.toString(), y.toString());

}).to("topicB");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

* 44

* 45

* 46

* 47

* 48

* 49

* 50

* 51

* 52

* 53

* 54

* 55

* 56

* 57

* 58

3.4.2 每隔5秒钟输出一次过去5秒内topicA里的wordcount,结果写入到TopicB

加5秒窗口,与3.4.1不同的是,前一个5秒与下一个5秒没有任何交叉。

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class WindowStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交时间设置为3秒

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream<Object, Object> source = builder.stream("topicA");

KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))

.map((x, y) -> {

return new KeyValue<String, String>(y, "1");

}).groupByKey()

//加五秒的窗口(前一个5秒和下一个5秒没有任何交叉) Tumbling Time Window

.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()))

.count();

countKtable.toStream().foreach((x,y)->{

System.out.println("x: "+x+" y: "+y);

});

countKtable.toStream().map((x,y)-> {

return new KeyValue<String, String>(x.toString(), y.toString());

}).to("topicB");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

* 44

* 45

* 46

* 47

* 48

* 49

* 50

* 51

* 52

* 53

* 54

* 55

3.4.3 TopicA 15秒内的wordcount,结果写入TopicB

比如登录某app,20分钟内不操作,会自动退出。
一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class WindowStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交时间设置为3秒

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream<Object, Object> source = builder.stream("windowdemo1");

KTable<Windowed<String>, Long> countKtable = source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))

.map((x, y) -> {

return new KeyValue<String, String>(y, "1");

}).groupByKey()

.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))

.count();

countKtable.toStream().foreach((x,y)->{

System.out.println("x: "+x+" y: "+y);

});

countKtable.toStream().map((x,y)-> {

return new KeyValue<String, String>(x.toString(), y.toString());

}).to("windowDemoOut");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

* 44

* 45

* 46

* 47

* 48

* 49

* 50

* 51

* 52

* 53

* 54

* 55

3.5 案例五:将TopicA的某一列扁平化处理写入TopicB

现有一张表user_friends,表结构如下,去掉表头,使用flume将内容写进Kafka的TopicA,将第二列扁平化处理写入TopicB。Flume–>Kafka的TopicA见另一篇博客Flume整合Kafka,本文演示从TopicA–>TopicB。
【Java】Kafka Stream 简介及基本运用(含案例)
处理后的数据要求如下,就是将第二列展开。
【Java】Kafka Stream 简介及基本运用(含案例)

`import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class UserFriendStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"UserFriendStream1");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

builder.stream("user_friends_raw").flatMap((k,v)->{

List<KeyValue<String,String>> list=new ArrayList<>();

String[] info = v.toString().split(",");

if(info.length==2){

String[] friends = info[1].split("s+");

if (info[0].trim().length()>0){

for (String friend : friends) {

//为了方便测试打印出来

System.out.println(info[0]+" "+friend);

list.add(new KeyValue<String,String>(null,info[0]+","+friend));

}}}

return list;

}).to("user_friends");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}`

* 1

* 2

* 3

* 4

* 5

* 6

* 7

* 8

* 9

* 10

* 11

* 12

* 13

* 14

* 15

* 16

* 17

* 18

* 19

* 20

* 21

* 22

* 23

* 24

* 25

* 26

* 27

* 28

* 29

* 30

* 31

* 32

* 33

* 34

* 35

* 36

* 37

* 38

* 39

* 40

* 41

* 42

* 43

* 44

* 45

* 46

* 47

* 48

* 49

* 50

* 51

3.6 案例六:将TopicA的多列扁平化处理写入TopicB

现有一张表event_attendees.csv,内容如下,去掉表头,使用Flume将内容写入Kafka的TopicA,将二三四五列扁平化处理写入TopicB。本案例仅演示TopicA–>TopicB,不演示Flume–>Kafka。

【Java】Kafka Stream 简介及基本运用(含案例)

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.KStream;

import java.util.ArrayList;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class EventAttendStream {

public static void main(String[] args) {

Properties prop =new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"UserFriendStream1");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream<Object, Object> ear = builder.stream("event_attendees_raw");

KStream<String, String> eventStream = ear.flatMap((k, v) -> { //event,yes,maybe,invited,no

System.out.println(k + " " + v);

String[] split = v.toString().split(",");

ArrayList<KeyValue<String, String>> list = new ArrayList<>();

if (split.length >= 2 && split[1].trim().length() > 0) {

String[] yes = split[1].split("s+");

for (String y : yes) {

list.add(new KeyValue<String, String>(null, split[0] + "," + y + ",yes"));

}

}

if (split.length >= 3 && split[2].trim().length() > 0) {

String[] maybe = split[2].split("s+");

for (String mb : maybe) {

list.add(new KeyValue<String, String>(null, split[0] + "," + mb + ",maybe"));

}

}

if (split.length >= 4 && split[3].trim().length() > 0) {

String[] invited = split[3].split("s+");

for (String inv : invited) {

list.add(new KeyValue<String, String>(null, split[0] + "," + inv + ",invited"));

}

}

if (split.length >= 5 && split[4].trim().length() > 0) {

String[] no = split[4].split("s+");

for (String n : no) {

list.add(new KeyValue<String, String>(null, split[0] + "," + no + ",no"));

}

}

return list;

});

eventStream.to("events_attend");

final Topology topo=builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}

以上是 【Java】Kafka Stream 简介及基本运用(含案例) 的全部内容, 来源链接: utcz.com/a/92362.html

回到顶部