Spark SQL | Spark,从入门到精通
|
发家史 熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。 Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。
![]()
Spark SQL Spark SQL 提供了多种接口:
纯 Sql 文本; 当然,相应的,也会有各种客户端:
sql 文本,可以用 thriftserver/spark-sql; Dataframe/Dataset API 简介 Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。 可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:
![]()
Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为: type DataFrame = Dataset[Row]
![]()
所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。 基本操作
val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”) 分区分桶 排序
分桶排序保存hive表 cube rullup pivot
cube SQL 编程 Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:
1. spark 代码 支持 Spark SQL 自身的语法,同时也兼容 HSQL。 1. 编码 要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。 SQLContext new SQLContext(SparkContext) HiveContext new HiveContext(spark.sparkContext) SparkSession 不使用 hive 元数据:
val spark = SparkSession.builder() 使用 hive 元数据:
val spark = SparkSession.builder() 使用
val df =spark.read.json("examples/src/main/resources/people.json") 2. spark-sql 脚本 spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用 bin/spark-sql –help 查看配置参数。 需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试
show tables; 3. thriftserver thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。 安装部署 /1 开启 hive 的 metastore bin/hive --service metastore /2 将配置文件复制到spark/conf/目录下 /3 thriftserver sbin/start-thriftserver.sh --masteryarn --deploy-mode client 对于 yarn 只支持 client 模式。 /4 启动 bin/beeline /5 连接到 thriftserver !connect jdbc:hive2://localhost:10001 用户自定义函数 1. UDF 定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:
val len = udf{(str:String) => str.length} 2. UserDefinedAggregateFunction 定义一个 UDAF
import org.apache.spark.sql.{Row, SparkSession}
object MyAverageUDAF extends UserDefinedAggregateFunction { 使用 UDAF
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") 3. Aggregator 定义一个 Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} object MyAverageAggregator extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b 使用
spark.udf.register("myAverage2", MyAverageAggregator) 数据源 1. 通用的 laod/save 函数 可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") 默认的是 parquet,可以通过 spark.sql.sources.default,修改默认配置。 2. Parquet 文件
val parquetFileDF =spark.read.parquet("people.parquet") 3. ORC 文件
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
4. JSON
ds.write.mode("overwrite").json("/opt/outputjson/") 5. Hive 表 spark 1.6 及以前的版本使用 hive 表需要 hivecontext。Spark2 开始只需要创建 sparksession 增加 enableHiveSupport()即可。
val spark = SparkSession spark.sql("select count(*) from student").show() 6. JDBC 写入 mysql
wcdf.repartition(1).write.mode("append").option("user", "root") 从 mysql 里读
val fromMysql = spark.read.option("user", "root") 7. 自定义数据源 自定义 source 比较简单,首先我们要看看 source 加载的方式。指定的目录下,定义一个 DefaultSource 类,在类里面实现自定义 source,就可以实现我们的目标。 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} class DefaultSource extends DataSourceV2 with ReadSupport {
def createReader(options: DataSourceOptions) = new SimpleDataSourceReader()
import org.apache.spark.sql.Row class SimpleDataSourceReader extends DataSourceReader { def readSchema() = StructType(Array(StructField("value", StringType)))
def createDataReaderFactories = {
import org.apache.spark.sql.Row
class SimpleDataSourceReaderFactory extends var index = 0 def next = index < values.length
def get = {
def close() = Unit 使用
val simpleDf = spark.read simpleDf.show() 优化器及执行计划 1. 流程简介
![]()
总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。 简单化成四个部分: /1 analysis Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。 /2 logical optimization 常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。 /3 physical planning eg:SortExec 。 /4 Codegen codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。 2. 自定义优化器 /1 实现 继承 Rule[LogicalPlan] object MultiplyOptimizationRule extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case Multiply(left,right) if right.isInstanceOf[Literal] && right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 => println("=========> optimization of one applied") left }
} val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1") println("after optimization") /2 注册 spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule) /3 使用 selectExpr("amountPaid* 1")
![]()
3. 自定义执行计划 /1 物理计划 继承 SparkLan 实现 doExecute 方法。 /2 逻辑计划 继承 SparkStrategy 实现 apply。 case class FastOperator(output: Seq[Attribute],child:SparkPlan) extends SparkPlan { override def children: Seq[SparkPlan] = Nil
override protected def doExecute(): RDD[InternalRow] = {
def toUnsafeRow(row: org.apache.spark.sql.Row, schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
def unsafeRowConverter(schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.Row => org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
object TestStrategy extends Strategy { /3 注册到 Spark 执行策略 spark.experimental.extraStrategies =Seq(countStrategy) /4 使用 spark.sql("select count(*) fromtest")
![]() |
时间:2019-01-24 20:58 来源: 转发量:次
声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。