博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkSql
阅读量:3963 次
发布时间:2019-05-24

本文共 4884 字,大约阅读时间需要 16 分钟。

介绍

Spark SQL是一个用于结构化数据处理的SPark模块。

Datasets and DataFrames:

DataSet:数据集是数据的分布式集合;它提供了RDDs(强大的类型,能够使用强大的lambda函数)的优点,以及SparkSQL的优化执行引擎的优点

DataFrame是数据集(DataSet)组织成有名字的列;它在概念上相当于关系数据库中的表;只有Scala中才会使用到dataFrame;而在Java中使用的是DataSet

API操作

前期准备

//告诉系统,hadoop在哪System.setProperty("hadoop.home.dir", "E:/帮助文档/大数据/hadoop-3.2.1");    /*      * 返回值是Builder     *  */    var builder = SparkSession.builder();    /* 设置一些参数 */    builder.appName("MySql");        var conf = new SparkConf();    /* 在本地跑 */    conf.setMaster("local[*]");    /* 传入一个config对象 */    builder.config(conf);    /* 单个单个的传 */    //builder.config(key, value)    /* 设置Spark自带的默认的仓库路径 */    //builder.config("hive.metastore.warehouse.dir", "e:/test/warehouse");    /* 初始化SparkSession */    var spark = builder.getOrCreate() ;     /* 获取sparkContext     * 千万不要new一个sparkContext和SparkSession     *  */    var sc = spark.sparkContext ;    /* 完成 */    println("初始化完成");    spark.stop();

主要方法

传入参数为SparkContext、SparkSession

def test01(sc:SparkContext,ss:SparkSession){

加载数据(DataFrame.read.load(path))(默认是parquet格式文件)

在Sparkrepo中的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”找到完整的示例代码,将其拷贝至eclipse项目下。

var path = "./users.parquet"     /*加载数据源      * load加载的是文件路径      * 加载和存储的格式默认就是parquet      * 返回值是users.parquet,类似于Rdd,但是比他高级      * */    var dataframe =  ss.read.load(path)

加载其他数据类型的文件(json)

var path = "./people.json"    //读取json的文件     var dataFrame = ss.read.format("json").load(path)    //也可以这样     var dataFrame =ss.read.json(path)

加载csv的文件

//两种加载文件的方法//第一种var DataFrameReader =  ss.read.format("csv")//第二种var csvDataFrame =ss.read.csv(path)   //csv文件默认是以,分隔,但是如果文件是以;分隔,故需要重新设置   DataFrameReader.option("sep", ";")
处理csv文件需要额外操作

假使什么都不去设置,打印出来就是如下

println("csvDataFrame.printSchema()")    csvDataFrame.printSchema()    |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true)    println("csvDataFrame.show")   csvDataFrame.show()   +-----+---+---------+|  _c0|_c1|      _c2|+-----+---+---------+| name|age|      job||Jorge| 30|Developer||  Bob| 32|Developer

所以还需要额外设置

//以上观之,将表头当作数据了,这样不可,需要将表头给拎出来     /* true:自动的推断数据的类型     * false:不自动推荐数据的类型;默认全部是String */   DataFrameReader.option("inferSchema", "true")     /* csv文件的第一行是不是数据;true:表示不是数据,是表头;     * false:表示的是数据,不是表头 */   DataFrameReader.option("header", "true")     var csvDataFrame = DataFrameReader.load(path)     println("csvDataFrame.printSchema()")     csvDataFrame.printSchema()     println("csvDataFrame.show")

打印表的相关内容printSchema、show

println("dataframe" + dataframe)        println("printSchema")    //打印dataframe的结构    dataframe.printSchema()    println("show")    //查看前20条数据    dataframe.show()

只检索自己想要的字段DataFrame.select(字段)

var selectdataframe =  dataframe.select("favorite_numbers")

进行sql操作

与使用ReadAPI将文件加载到DataFrame并对其进行查询不同,还可以直接使用SQL查询该文件

//第一种操作//注意是``,而非'' var sql = "select * from json.`./people.json`"   var SqlDataFrame = ss.sql(sql)           //第二种操作     //使用方法进行加载     var path = "./people.json"    var JsondataFrame =  ss.read.json(path)       //第三种操作    //创建一张临时表    //注意是createOrReplaceTempView   // 而不是createGlobalTempView    JsondataFrame.createOrReplaceTempView("peo")    //sql语句    var Jsonsql = "select name from peo where age > 20"    var JsonsqlDataFrame = ss.sql(Jsonsql)       println("JsonsqlDataFrame.printSchema()")    JsonsqlDataFrame.printSchema()    println("JsonsqlDataFrame.show")    JsonsqlDataFrame.show()

存储数据DataFrame.write.save

//保留数据    var respath = path + "_result"    selectdataframe.write.save(respath)
存储为其他类型文件
//文件存储,以另外一种方式存储     var respath = "./people"     dataFrame.write.format("parquet").save(respath)     //也可这样     dataFrame.write.parquet(respath)

保存方式

保存操作可以选择SaveMode,它指定如何处理现有数据(如果存在)。重要的是要认识到,这些保存模式不使用任何锁定,也不是原子的。此外,在执行Overwrite,在写出新数据之前,数据将被删除。

/* 保存模式     * SaveMode.ErrorIfExists,若文件存在,则报错     * SaveMode.Append 若文件存在,则追加     * SaveMode.Overwrite若文件存在,则覆盖     * SaveMode.Ignore:若文件存在,则忽略     *  */    var respath = csvPath + "_res"    dataFrame.write.mode(SaveMode.Overwrite).json(respath)

保存到持久化表

DataFrames还可以使用saveAsTable命令。注意,现有的Hive部署并不是使用此特性所必需的。spark将为您创建一个默认的本地Hive转移(使用Derby)。不像createOrReplaceTempView命令,saveAsTable将显现DataFrame的内容,并创建一个指向Hive转移区中数据的指针。即使spark重新启动之后,持久化表仍然存在,只要您保持的连接是同一元数据。可以通过调用table方法的SparkSession的table方法。

对于基于文件的数据源,例如文本、拼图、json等,您可以通过path选项,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”)…当表被删除时,自定义表路径将不会被移除,并且表数据仍然存在。如果未指定自定义表路径,SPark将数据写入仓库目录下的默认表路径。当表被删除时,默认的表路径也将被删除。

从Spark2.1开始,持久性数据源表的每个分区元数据都存储在Hive的元数据中。这带来了几个好处:

由于元数据只能返回查询所需的分区,因此不再需要在第一个查询中发现表中的所有分区。

单元DDL,如ALTER TABLE PARTITION … SET LOCATION现在可用于使用Datasource API创建的表。
注意,在创建外部数据源表(那些具有path备选方案)。要同步元数据中的分区信息,可以调用MSCK REPAIR TABLE.

//保存到hive    //dataFrame.write.format("json").saveAsTable("temp")  上面指令先执行,将数据保存在spark-warehouse/temp下

加载数据,就要read(读),然后load(加载),数据被处理为dataframe

保存数据:就要write(写),然后save(保存),生成的是文件夹

转载地址:http://egrzi.baihongyu.com/

你可能感兴趣的文章
java 泛型
查看>>
控制结构
查看>>
标准输入输出
查看>>
运算符
查看>>
数据类型之列表与数组
查看>>
比较字符串
查看>>
Java EE 精萃
查看>>
Open Source 精萃
查看>>
Java EE 简介
查看>>
Weblogic 简介
查看>>
观察者模式 (Observer)
查看>>
Java 集合框架
查看>>
Weblogic 精萃
查看>>
Servlet 精萃
查看>>
XStream 精萃
查看>>
XStream 环境设置
查看>>
Git 分支
查看>>
Git 冲突
查看>>
Git Merging vs. Rebasing
查看>>
[第9课] 箱线图
查看>>