理解SparkSQL(三)——SparkSQL程序举例

database

上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createOrReplaceTempView注册一个临时表,所以关键是先要将RDD转换成DataFrame。实际上,在Spark中实际声明了

type DataFrame = Dataset[Row]

所以,DataFrame是Dataset[Row]的别名。RDD是提供面向低层次的API,而DataFrame/Dataset提供面向高层次的API(适合于SQL等面向结构化数据的场合)。

下面提供一些Spark SQL程序的例子。

例子一:SparkSQLExam.scala

 1package bruce.bigdata.spark.example

2

3import org.apache.spark.sql.Row

4import org.apache.spark.sql.SparkSession

5import org.apache.spark.sql.types._

6

7object SparkSQLExam {

8

9caseclass offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)

10

11 def main(args: Array[String]) {

12

13 val spark = SparkSession

14 .builder

15 .appName("SparkSQLExam")

16 .getOrCreate()

17

18 runSparkSQLExam1(spark)

19 runSparkSQLExam2(spark)

20

21 spark.stop()

22

23 }

24

25

26private def runSparkSQLExam1(spark: SparkSession): Unit = {

27

28import spark.implicits._

29

30 val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split(" ")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))

31 val officesDataFrame = spark.createDataFrame(rddOffices)

32

33 officesDataFrame.createOrReplaceTempView("offices")

34 spark.sql("select city from offices where region="Eastern"").map(t=>"City: " + t(0)).collect.foreach(println)

35

36

37 }

38

39private def runSparkSQLExam2(spark: SparkSession): Unit = {

40

41import spark.implicits._

42import org.apache.spark.sql._

43import org.apache.spark.sql.types._

44

45 val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))

46 val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split(" ")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))

47 val dataFrame = spark.createDataFrame(rowRDD, schema)

48

49 dataFrame.createOrReplaceTempView("offices2")

50 spark.sql("select city from offices2 where region="Western"").map(t=>"City: " + t(0)).collect.foreach(println)

51

52 }

53

54 }

使用下面的命令进行编译:

[root@BruceCentOS4 scala]# scalac SparkSQLExam.scala

在编译之前,需要在CLASSPATH中增加路径:

export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)

然后打包成jar文件:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkSQLExam --master yarn --deploy-mode client spark_exam_scala.jar

运行结果截图:

 

例子二:SparkSQLExam.scala(需要启动hive metastore)

 1package  bruce.bigdata.spark.example

2

3import org.apache.spark.sql.{SaveMode, SparkSession}

4

5object SparkHiveExam {

6

7 def main(args: Array[String]) {

8

9 val spark = SparkSession

10 .builder()

11 .appName("Spark Hive Exam")

12 .config("spark.sql.warehouse.dir", "/user/hive/warehouse")

13 .enableHiveSupport()

14 .getOrCreate()

15

16import spark.implicits._

17

18//使用hql查看hive数据

19 spark.sql("show databases").collect.foreach(println)

20 spark.sql("use orderdb")

21 spark.sql("show tables").collect.foreach(println)

22 spark.sql("select city from offices where region="Eastern"").map(t=>"City: " + t(0)).collect.foreach(println)

23

24//将hql查询出的数据保存到另外一张新建的hive表

25//找出订单金额超过1万美元的产品

26 spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string)

27 ROW FORMAT DELIMITED FIELDS TERMINATED BY " " LINES TERMINATED BY "

" STORED AS TEXTFILE""")

28 spark.sql("""select mfr_id,product_id,description

29 from products a inner join orders b

30 on a.mfr_id=b.mfr and a.product_id=b.product

31 where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales")

32

33//将HDFS文件数据导入到hive表中

34 spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double )

35 ROW FORMAT DELIMITED FIELDS TERMINATED BY " " LINES TERMINATED BY "

" STORED AS TEXTFILE""")

36 spark.sql("LOAD DATA INPATH "/user/hive/warehouse/orderdb.db/offices/offices.txt" INTO TABLE offices2")

37

38 spark.stop()

39 }

40 }

使用下面的命令进行编译:

[root@BruceCentOS4 scala]# scalac SparkHiveExam.scala

使用下面的命令打包:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

使用下面的命令运行:

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkHiveExam --master yarn --deploy-mode client spark_exam_scala.jar

程序运行结果:

 

另外上述程序运行后,hive中多了2张表:

 

 

例子三:spark_sql_exam.py

 1from__future__import print_function

2

3from pyspark.sql import SparkSession

4from pyspark.sql.types import *

5

6

7if__name__ == "__main__":

8 spark = SparkSession

9 .builder

10 .appName("Python Spark SQL exam")

11 .config("spark.some.config.option", "some-value")

12 .getOrCreate()

13

14 schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False),

15 StructField("region", StringType(), False), StructField("mgr", IntegerType(), True),

16 StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)])

17

18 rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split(""))

19 .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip())))

20

21 dataFrame = spark.createDataFrame(rowRDD, schema)

22 dataFrame.createOrReplaceTempView("offices")

23 spark.sql("select city from offices where region="Eastern"").show()

24

25 spark.stop()

 执行命令运行程序:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

程序运行结果:

 

例子四:JavaSparkSQLExam.java

 1package bruce.bigdata.spark.example;

2

3import java.util.ArrayList;

4import java.util.List;

5

6import org.apache.spark.api.java.JavaRDD;

7import org.apache.spark.api.java.function.Function;

8import org.apache.spark.api.java.function.MapFunction;

9import org.apache.spark.sql.Dataset;

10import org.apache.spark.sql.Row;

11import org.apache.spark.sql.RowFactory;

12import org.apache.spark.sql.SparkSession;

13import org.apache.spark.sql.types.DataTypes;

14import org.apache.spark.sql.types.StructField;

15import org.apache.spark.sql.types.StructType;

16import org.apache.spark.sql.AnalysisException;

17

18

19publicclass JavaSparkSQLExam {

20publicstaticvoid main(String[] args) throws AnalysisException {

21 SparkSession spark = SparkSession

22 .builder()

23 .appName("Java Spark SQL exam")

24 .config("spark.some.config.option", "some-value")

25 .getOrCreate();

26

27 List<StructField> fields = new ArrayList<>();

28 fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false));

29 fields.add(DataTypes.createStructField("city", DataTypes.StringType, false));

30 fields.add(DataTypes.createStructField("region", DataTypes.StringType, false));

31 fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true));

32 fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true));

33 fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false));

34

35 StructType schema = DataTypes.createStructType(fields);

36

37

38 JavaRDD<String> officesRDD = spark.sparkContext()

39 .textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)

40 .toJavaRDD();

41

42 JavaRDD<Row> rowRDD = officesRDD.map((Function<String, Row>) record -> {

43 String[] attributes = record.split(" ");

44return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim()));

45 });

46

47 Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, schema);

48

49 dataFrame.createOrReplaceTempView("offices");

50 Dataset<Row> results = spark.sql("select city from offices where region="Eastern"");

51 results.collectAsList().forEach(r -> System.out.println(r));

52

53 spark.stop();

54 }

55 }

编译打包后通过如下命令执行:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.JavaSparkSQLExam --master yarn --deploy-mode client spark_exam_java.jar

运行结果:

 

上面是一些关于Spark SQL程序的一些例子,分别采用了Scala/Python/Java来编写的。另外除了这三种语言,Spark还支持R语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实API都是基本一致的,主要是采用DataFrame和Dataset的高层次API来调用和执行SQL。使用这些API,可以轻松的将结构化数据转化成SQL来操作,同时也能够方便的操作Hive中的数据。

 

 

 

 

 

 

 

 

 

 

 

以上是 理解SparkSQL(三)——SparkSQL程序举例 的全部内容, 来源链接: utcz.com/z/531553.html

回到顶部