sparksqlcreateOrReplaceTempView的使用
最近项目中使用了spark-sql来进行埋点数据的修复以及转移
通过spark-sql修复埋点中的数据,并且再次写入到kafka中供其他的实时流进行消费
其中使用到了createOrReplaceTempView
spark创建tempView分为两种方式:
1.createOrReplaceTempView
2. createGlobalTempView
createOrReplaceTempView()的使用
创建或替换本地临时视图。
此视图的生命周期依赖于SparkSession类,如果想drop此视图可采用dropTempView删除
spark.catalog.dropTempView("tempViewName")
或者 stop() 来停掉 session
self.ss = SparkSession(sc)
...
self.ss.stop()
createGlobalTempView使用
createGlobalTempView():创建全局临时视图。
这种视图的生命周期取决于spark application本身。如果想drop此视图可采用dropGlobalTempView删除
spark.catalog.dropGlobalTempView("tempViewName")
或者stop() 将停止
ss = SparkContext(conf=conf, ......)
...
ss.stop()
本次项目中使用的是createOrReplaceTempView,因为一个spark app里面只创建了一个spark session
创建:
val studentQueResultDf = XlyDbTools.readXlyShardingDbPartitionByColum(spark, ExamConstants.XLY_TABLE_EXAM_STUDENT_QUESTION_RESULT, "ee_id", dataNodeExamIdMapSliceByExamId)
studentQueResultDf.select("ee_id","student_id","q_id","content","content_result","rw","item_score","finish_time")
.createOrReplaceTempView(ExamConstants.XLY_TABLE_EXAM_STUDENT_QUESTION_RESULT)
studentQueResultDf.cache()
使用时直接当作表视图来使用:
val examDfEff = spark.sql(s"select ee_id as examId," +
s"rw as rw," +
s"q_id as qId " +
s"from ${ExamConstants.XLY_TABLE_EXAM_STUDENT_QUESTION_RESULT} where ee_id = ${examId} " );
examDfEff.foreach(row =>{
val examId = row.getAs[java.lang.Long]("examId")
})
println(examDfEff.count());
以上是 sparksqlcreateOrReplaceTempView的使用 的全部内容, 来源链接: utcz.com/z/517652.html