sparksqlcreateOrReplaceTempView的使用

编程

最近项目中使用了spark-sql来进行埋点数据的修复以及转移

通过spark-sql修复埋点中的数据,并且再次写入到kafka中供其他的实时流进行消费

其中使用到了createOrReplaceTempView

spark创建tempView分为两种方式:

1.createOrReplaceTempView

2. createGlobalTempView

createOrReplaceTempView()的使用

 创建或替换本地临时视图。

此视图的生命周期依赖于SparkSession类,如果想drop此视图可采用dropTempView删除

spark.catalog.dropTempView("tempViewName")

或者 stop() 来停掉 session

self.ss = SparkSession(sc)

...

self.ss.stop()

createGlobalTempView使用

createGlobalTempView():创建全局临时视图。

这种视图的生命周期取决于spark application本身。如果想drop此视图可采用dropGlobalTempView删除

spark.catalog.dropGlobalTempView("tempViewName")

或者stop() 将停止

ss = SparkContext(conf=conf, ......)

...

ss.stop()

 本次项目中使用的是createOrReplaceTempView,因为一个spark app里面只创建了一个spark session

创建:

val studentQueResultDf = XlyDbTools.readXlyShardingDbPartitionByColum(spark, ExamConstants.XLY_TABLE_EXAM_STUDENT_QUESTION_RESULT, "ee_id", dataNodeExamIdMapSliceByExamId)

studentQueResultDf.select("ee_id","student_id","q_id","content","content_result","rw","item_score","finish_time")

.createOrReplaceTempView(ExamConstants.XLY_TABLE_EXAM_STUDENT_QUESTION_RESULT)

studentQueResultDf.cache()

使用时直接当作表视图来使用:

val examDfEff = spark.sql(s"select ee_id as examId," +

s"rw as rw," +

s"q_id as qId " +

s"from ${ExamConstants.XLY_TABLE_EXAM_STUDENT_QUESTION_RESULT} where ee_id = ${examId} " );

examDfEff.foreach(row =>{

val examId = row.getAs[java.lang.Long]("examId")

})

println(examDfEff.count());

 

以上是 sparksqlcreateOrReplaceTempView的使用 的全部内容, 来源链接: utcz.com/z/517652.html

回到顶部