SparkStreaming两种方式连接Flume

database

SparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark 拉取 Flume的输出数据);

 Flume向SparkStreaming推送数据没有研究明白,有大佬指点一下吗?

万分感谢!

1.Spark拉取Flume数据:

导入两个jar包到flume/lib下

 否则抛出这两个异常:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink

java.lang.IllegalStateException: begin() called when transaction is OPEN!

2.编写flume 工作文件:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# source

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding

a1.sources.r1.fileHeader=true

# Describe the sink

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink

# 当前主机端口

a1.sinks.k1.hostname = 192.168.137.88

a1.sinks.k1.port = 9999

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3.编写SparkStreaming程序:

package day02

import java.net.InetSocketAddress

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}

import org.apache.spark.{SparkConf, SparkContext}

/**

* @ClassName: StreamingFlume

* @Description TODO 实时监控flume,统计flume数据产生,是Spark

* @Author: Charon

* @Date: 2021/4/7 13:19

* @Version 1.0

**/

object StreamingFlume {

def main(args: Array[String]): Unit = {

//1.创建SparkConf对象

val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")

//2.创建SparkContext对象

val sc = new SparkContext(conf)

//设置日志输出格式,只打印异常日志,在这里设置没有用

//sc.setLogLevel("WARN")

//3.创建StreamingContext,Seconds(5):轮询机制,多久执行一次

val ssc = new StreamingContext(sc, Seconds(5))

//4.定义一个flume集合,可以接受多个flume数据,多个用,隔开需要new

val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))

//5.获取flume中的数据,

val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)

// 6.截取flume数据:{"header":xxxxx   "body":xxxxxx}

val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))

lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()

ssc.start()

ssc.awaitTermination()

}

}

 4。开启flume监控文件,开启SparkStreaming程序:

向指定目录上传文件

 

 

 

以上是 SparkStreaming两种方式连接Flume 的全部内容, 来源链接: utcz.com/z/535403.html

回到顶部