Kafka数据源生成器

编程

  • 创建Topic:user_behavior

    $ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user_behavior

    WARNING: Due to limitations in metric names, topics with a period (".") or underscore ("_") could collide. To avoid issues it is best to use either, but not both.

    $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092

    test

    user_behavior

  • 二.生成器代码

    参考:SourceGenerator

    Java代码:MockSourceGenerator

    public class MockSourceGenerator {

    private static final long SPEED = 10; // 默认每秒10条 hecg

    public static void main(String[] args) {

    long speed = SPEED;

    if (args.length > 0) {

    speed = Long.valueOf(args[0]);

    }

    long delay = 1000_000 / speed; // 每条耗时多少毫秒

    // 读取上面的数据集,按行为单位

    try (InputStream inputStream = MockSourceGenerator.class.getClassLoader().getResourceAsStream("user_behavior.log")) {

    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

    long start = System.nanoTime();

    while (reader.ready()) {

    String line = reader.readLine();

    System.out.println(line);

    long end = System.nanoTime();

    long diff = end - start;

    while (diff < (delay*1000)) {

    Thread.sleep(1);

    end = System.nanoTime();

    diff = end - start;

    }

    start = end;

    }

    reader.close();

    } catch (IOException e) {

    throw new RuntimeException(e);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    编译打包,命令行测试:后面参数表示每秒输出多少条数据

    $ java -cp target/java-flink-1.0-SNAPSHOT.jar cn.rumoss.study.flink.MockSourceGenerator 1

    {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

    {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

    {"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

    ...

    三.使用管道,往Kafka中丢入数据

    把上面的Jar 包复制到Kafka根目录下:

    • 往Topic中生产数据:

      $ java -cp java-flink-1.0-SNAPSHOT.jar cn.rumoss.study.flink.MockSourceGenerator 1 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user_behavior

      >>>...

    • 订阅Topic消费放入的数据,可以看到陆续有数据进来:

      $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_behavior --from-beginning

      {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

      {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

      {"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

      ...

    以上是 Kafka数据源生成器 的全部内容, 来源链接: utcz.com/z/512907.html

    回到顶部