伦伦影院久久影视,天天操天天干天天射,ririsao久久精品一区 ,一本大道香蕉大久在红桃,999久久久免费精品国产色夜,色悠悠久久综合88,亚洲国产精品久久无套麻豆,亚洲香蕉毛片久久网站,一本一道久久综合狠狠老

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Spark SQL的概念及查詢方式

數據分析與開發 ? 來源:大數據技術與架構 ? 作者:大數據技術與架構 ? 2021-09-02 15:44 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

一、Spark SQL的概念理解

Spark SQL是spark套件中一個模板,它將數據的計算任務通過SQL的形式轉換成了RDD的計算,類似于Hive通過SQL的形式將數據的計算任務轉換成了MapReduce。

Spark SQL的特點:

和Spark Core的無縫集成,可以在寫整個RDD應用的時候,配置Spark SQL來完成邏輯實現。

統一的數據訪問方式,Spark SQL提供標準化的SQL查詢。

Hive的繼承,Spark SQL通過內嵌的hive或者連接外部已經部署好的hive案例,實現了對hive語法的繼承和操作。

標準化的連接方式,Spark SQL可以通過啟動thrift Server來支持JDBC、ODBC的訪問,將自己作為一個BI Server使用

Spark SQL數據抽象:

RDD(Spark1.0)-》DataFrame(Spark1.3)-》DataSet(Spark1.6)

Spark SQL提供了DataFrame和DataSet的數據抽象

DataFrame就是RDD+Schema,可以認為是一張二維表格,劣勢在于編譯器不進行表格中的字段的類型檢查,在運行期進行檢查

DataSet是Spark最新的數據抽象,Spark的發展會逐步將DataSet作為主要的數據抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的優化機制。除此之外提供了以樣例類為Schema模型的強類型

DataFrame=DataSet[Row]

DataFrame和DataSet都有可控的內存管理機制,所有數據都保存在非堆上,都使用了catalyst進行SQL的優化。

Spark SQL客戶端查詢:

可以通過Spark-shell來操作Spark SQL,spark作為SparkSession的變量名,sc作為SparkContext的變量名

可以通過Spark提供的方法讀取json文件,將json文件轉換成DataFrame

可以通過DataFrame提供的API來操作DataFrame里面的數據。

可以通過將DataFrame注冊成為一個臨時表的方式,來通過Spark.sql方法運行標準的SQL語句來查詢。

二、Spark SQL查詢方式

DataFrame查詢方式

DataFrame支持兩種查詢方式:一種是DSL風格,另外一種是SQL風格

(1)、DSL風格:

需要引入import spark.implicit. _ 這個隱式轉換,可以將DataFrame隱式轉換成RDD

(2)、SQL風格:

a、需要將DataFrame注冊成一張表格,如果通過CreateTempView這種方式來創建,那么該表格Session有效,如果通過CreateGlobalTempView來創建,那么該表格跨Session有效,但是SQL語句訪問該表格的時候需要加上前綴global_temp

b、需要通過sparkSession.sql方法來運行你的SQL語句

DataSet查詢方式

定義一個DataSet,先定義一個Case類

三、DataFrame、Dataset和RDD互操作

RDD-》DataFrame

普通方式:例如rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF(“name”,“age”)

通過反射來設置schema,例如:

#通過反射設置schema,數據集是spark自帶的people.txt,路徑在下面的代碼中case class Person(name:String,age:Int)

val peopleDF=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”).map(_.split(“,”)).map(para=》Person(para(0).trim,para(1).trim.toInt)).toDF

peopleDF.show

8a20a542-0bb0-11ec-8fb8-12bb97331649.png

#注冊成一張臨時表

peopleDF.createOrReplaceTempView(“persons”)

val teen=spark.sql(“select name,age from persons where age between 13 and 29”)

teen.show

8a301b1c-0bb0-11ec-8fb8-12bb97331649.png

這時teen是一張表,每一行是一個row對象,如果需要訪問Row對象中的每一個元素,可以通過下標 row(0);你也可以通過列名 row.getAs[String](“name”)

8a3be46a-0bb0-11ec-8fb8-12bb97331649.png

也可以使用getAs方法:

8a45a978-0bb0-11ec-8fb8-12bb97331649.png

3、通過編程的方式來設置schema,適用于編譯器不能確定列的情況

val peopleRDD=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”)

val schemaString=“name age”

val filed=schemaString.split(“ ”).map(filename=》 org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))

val schema=org.apache.spark.sql.types.StructType(filed)

peopleRDD.map(_.split(“,”)).map(para=》org.apache.spark.sql.Row(para(0).trim,para(1).trim))

val peopleDF=spark.createDataFrame(res6,schema)

peopleDF.show

8a52119a-0bb0-11ec-8fb8-12bb97331649.png

8a5ddf02-0bb0-11ec-8fb8-12bb97331649.png

8a6a9cb0-0bb0-11ec-8fb8-12bb97331649.png

DataFrame-》RDD

dataFrame.rdd

RDD-》DataSet

rdd.map(para=》 Person(para(0).trim(),para(1).trim().toInt)).toDS

DataSet-》DataSet

dataSet.rdd

DataFrame -》 DataSet

dataFrame.to[Person]

DataSet -》 DataFrame

dataSet.toDF

四、用戶自定義函數

用戶自定義UDF函數

通過spark.udf功能用戶可以自定義函數

自定義udf函數:

通過spark.udf.register(name,func)來注冊一個UDF函數,name是UDF調用時的標識符,fun是一個函數,用于處理字段。

需要將一個DF或者DS注冊為一個臨時表

通過spark.sql去運行一個SQL語句,在SQL語句中可以通過name(列名)方式來應用UDF函數

用戶自定義聚合函數

1. 弱類型用戶自定義聚合函數

新建一個Class 繼承UserDefinedAggregateFunction ,然后復寫方法:

//聚合函數需要輸入參數的數據類型

override def inputSchema: StructType = ???

//可以理解為保存聚合函數業務邏輯數據的一個數據結構

override def bufferSchema: StructType = ???

// 返回值的數據類型

override def dataType: DataType = ???

// 對于相同的輸入一直有相同的輸出

override def deterministic: Boolean = true

//用于初始化你的數據結構

override def initialize(buffer: MutableAggregationBuffer): Unit = ???

//用于同分區內Row對聚合函數的更新操作

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???

//用于不同分區對聚合結果的聚合。

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???

//計算最終結果

override def evaluate(buffer: Row): Any = ???

你需要通過spark.udf.resigter去注冊你的UDAF函數。

需要通過spark.sql去運行你的SQL語句,可以通過 select UDAF(列名) 來應用你的用戶自定義聚合函數。

2、強類型用戶自定義聚合函數

新建一個class,繼承Aggregator[Employee, Average, Double],其中Employee是在應用聚合函數的時候傳入的對象,Average是聚合函數在運行的時候內部需要的數據結構,Double是聚合函數最終需要輸出的類型。這些可以根據自己的業務需求去調整。復寫相對應的方法:

//用于定義一個聚合函數內部需要的數據結構

override def zero: Average = ???

//針對每個分區內部每一個輸入來更新你的數據結構

override def reduce(b: Average, a: Employee): Average = ???

//用于對于不同分區的結構進行聚合

override def merge(b1: Average, b2: Average): Average = ???

//計算輸出

override def finish(reduction: Average): Double = ???

//用于數據結構他的轉換

override def bufferEncoder: Encoder[Average] = ???

//用于最終結果的轉換

override def outputEncoder: Encoder[Double] = ???

新建一個UDAF實例,通過DF或者DS的DSL風格語法去應用。

五、Spark SQL和Hive的繼承

1、內置Hive

Spark內置有Hive,Spark2.1.1 內置的Hive是1.2.1。

需要將core-site.xml和hdfs-site.xml 拷貝到spark的conf目錄下。如果Spark路徑下發現metastore_db,需要刪除【僅第一次啟動的時候】。

在你第一次啟動創建metastore的時候,你需要指定spark.sql.warehouse.dir這個參數, 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse

注意,如果你在load數據的時候,需要將數據放到HDFS上。

2、外部Hive(這里主要使用這個方法)

需要將hive-site.xml 拷貝到spark的conf目錄下。

如果hive的metestore使用的是mysql數據庫,那么需要將mysql的jdbc驅動包放到spark的jars目錄下。

可以通過spark-sql或者spark-shell來進行sql的查詢。完成和hive的連接。

8a76fc9e-0bb0-11ec-8fb8-12bb97331649.png

這就是hive里面的表

8a89228e-0bb0-11ec-8fb8-12bb97331649.png

六、Spark SQL的數據源

1、輸入

對于Spark SQL的輸入需要使用sparkSession.read方法

通用模式 sparkSession.read.format(“json”).load(“path”) 支持類型:parquet、json、text、csv、orc、jdbc

專業模式 sparkSession.read.json、 csv 直接指定類型。

2、輸出

對于Spark SQL的輸出需要使用 sparkSession.write方法

通用模式 dataFrame.write.format(“json”).save(“path”) 支持類型:parquet、json、text、csv、orc

專業模式 dataFrame.write.csv(“path”) 直接指定類型

如果你使用通用模式,spark默認parquet是默認格式、sparkSession.read.load 加載的默認是parquet格式dataFrame.write.save也是默認保存成parquet格式。

如果需要保存成一個text文件,那么需要dataFrame里面只有一列(只需要一列即可)。

七、Spark SQL實戰

1、數據說明

這里有三個數據集,合起來大概有幾十萬條數據,是關于貨品交易的數據集。

8aa849d4-0bb0-11ec-8fb8-12bb97331649.png

2、任務

這里有三個需求:

計算所有訂單中每年的銷售單數、銷售總額

計算所有訂單每年最大金額訂單的銷售額

計算所有訂單中每年最暢銷貨品

3、步驟

1. 加載數據

tbStock.txt

#代碼case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable

val tbStockRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStock.txt”)

val tbStockDS=tbStockRdd.map(_.split(“,”)).map(attr=》tbStock(attr(0),attr(1),attr(2))).toDS

tbStockDS.show()

8ab2028a-0bb0-11ec-8fb8-12bb97331649.png8ac01d7a-0bb0-11ec-8fb8-12bb97331649.png8ad1cfc0-0bb0-11ec-8fb8-12bb97331649.png

8adee8f4-0bb0-11ec-8fb8-12bb97331649.png

tbStockDetail.txt

case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable

val tbStockDetailRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStockDetail.txt”)

val tbStockDetailDS=tbStockDetailRdd.map(_.split(“,”)).map(attr=》tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS

tbStockDetailDS.show()

8af1d2de-0bb0-11ec-8fb8-12bb97331649.png8b00fd68-0bb0-11ec-8fb8-12bb97331649.png

8b0bcd88-0bb0-11ec-8fb8-12bb97331649.png

8b305a72-0bb0-11ec-8fb8-12bb97331649.png

tbDate.txt

case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable

val tbDateRdd=spark.sparkContext.textFile(“file:///root/dataset/tbDate.txt”)

val tbDateDS=tbDateRdd.map(_.split(“,”)).map(attr=》tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS

tbDateDS.show()

8b45157a-0bb0-11ec-8fb8-12bb97331649.png8b5182f6-0bb0-11ec-8fb8-12bb97331649.png

8b638f14-0bb0-11ec-8fb8-12bb97331649.png

8b7541be-0bb0-11ec-8fb8-12bb97331649.png

2. 注冊表

tbStockDS.createOrReplaceTempView(“tbStock”)

tbDateDS.createOrReplaceTempView(“tbDate”)

tbStockDetailDS.createOrReplaceTempView(“tbStockDetail”)

8b829b66-0bb0-11ec-8fb8-12bb97331649.png

3. 解析表

計算所有訂單中每年的銷售單數、銷售總額

#sql語句

select c.theyear,count(distinct a.ordernumber),sum(b.amount)

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear

order by c.theyear

8b8fb152-0bb0-11ec-8fb8-12bb97331649.png

計算所有訂單每年最大金額訂單的銷售額

a、先統計每年每個訂單的銷售額

select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

group by a.dateid,a.ordernumber

8ba32660-0bb0-11ec-8fb8-12bb97331649.png

b、計算最大金額訂單的銷售額

select d.theyear,c.SumOfAmount as SumOfAmount

from

(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

group by a.dateid,a.ordernumber) c

join tbDate d on c.dateid=d.dateid

group by d.theyear

order by theyear desc

8bb0abfa-0bb0-11ec-8fb8-12bb97331649.png

計算所有訂單中每年最暢銷貨品

a、求出每年每個貨品的銷售額

select c.theyear,b.itemid,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid

8bc427c0-0bb0-11ec-8fb8-12bb97331649.png

b、在a的基礎上,統計每年單個貨品的最大金額

select d.theyear,max(d.SumOfAmount) as MaxOfAmount

from

(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid) d

group by theyear

8bd5482a-0bb0-11ec-8fb8-12bb97331649.png

c、用最大銷售額和統計好的每個貨品的銷售額join,以及用年join,集合得到最暢銷貨品那一行信息

select distinct e.theyear,e.itemid,f.maxofamount

from

(select c.theyear,b.itemid,sum(b.amount) as sumofamount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid) e

join

(select d.theyear,max(d.sumofamount) as maxofamount

from

(select c.theyear,b.itemid,sum(b.amount) as sumofamount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid) d

group by d.theyear) f on e.theyear=f.theyear

and e.sumofamount=f.maxofamount order by e.theyear

8be25894-0bb0-11ec-8fb8-12bb97331649.png

編輯:jq

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據
    +關注

    關注

    8

    文章

    7343

    瀏覽量

    94979
  • SQL
    SQL
    +關注

    關注

    1

    文章

    807

    瀏覽量

    46883
  • 函數
    +關注

    關注

    3

    文章

    4419

    瀏覽量

    67772
  • RDD
    RDD
    +關注

    關注

    0

    文章

    7

    瀏覽量

    8184

原文標題:Spark SQL 重點知識總結

文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關推薦
    熱點推薦

    NineData 2026年3月功能上新:支持飛書外部審批,增強慢查詢分析與數據復制能力

    NineData智能數據管理平臺2026年3月新功能發布,圍繞數據庫 DevOps、慢查詢分析、數據歸檔清理與數據復制持續升級:新增飛書 Lark 外部審批和多渠道消息通知,慢查詢分析擴展支持阿里云
    的頭像 發表于 04-10 11:40 ?248次閱讀
    NineData 2026年3月功能上新:支持飛書外部審批,增強慢<b class='flag-5'>查詢</b>分析與數據復制能力

    MySQL慢查詢調優指南

    MySQL慢查詢是數據庫性能問題的最常見原因。當一條SQL語句執行超過1秒時,就可能影響用戶體驗;超過10秒時,通常會收到用戶投訴;而超過30秒的查詢,往往意味著系統存在嚴重的性能問題。本文從實
    的頭像 發表于 04-09 10:01 ?125次閱讀

    MySQL數據庫慢查詢分析與優化實戰

    在討論MySQL慢查詢之前,需要先明確一個關鍵前提:什么是慢查詢? 不同業務場景下,慢查詢的定義差異巨大。一個數據報表后臺的SQL執行30秒可能屬于正常范圍,但一個訂單創建的數據庫操作
    的頭像 發表于 04-02 09:38 ?120次閱讀

    NineData SQL AI 智能補全上線:寫 SQL,不必每次都從頭敲

    NineData推出SQLAI智能補全功能,通過AI技術實現上下文感知的SQL語句智能提示。該功能不僅能補全關鍵字,還能根據當前輸入內容預測后續查詢意圖,顯著提升多表關聯、復雜條件等場景下的編寫效率
    的頭像 發表于 04-01 20:19 ?235次閱讀
    NineData <b class='flag-5'>SQL</b> AI 智能補全上線:寫 <b class='flag-5'>SQL</b>,不必每次都從頭敲

    SQL分析選型:DMS/DAS與NineData該如何選擇

    阿里云 DMS 的慢SQL 趨勢、DAS 的 SQL 審計能力成熟,可滿足阿里云用戶基礎需求。NineData 側重跨云統一工作臺、研發與 DBA 協同,打通慢日志分析、性能診斷、規范審核、索引建議全鏈路,更適配企業級慢查詢持續
    的頭像 發表于 03-25 17:20 ?1501次閱讀
    慢<b class='flag-5'>SQL</b>分析選型:DMS/DAS與NineData該如何選擇

    NineData 社區版的慢SQL分析,比查看日志+看EXPLAIN適合中小團隊

    本文探討 NineData 社區版在 MySQL 慢 SQL 場景對中小團隊的適用性。與 “查看日志 + 看 EXPLAIN” 傳統方式不同,它將慢 SQL 按模板聚合,能從大盤、模板、診斷等多維
    的頭像 發表于 03-17 14:07 ?93次閱讀
    NineData 社區版的慢<b class='flag-5'>SQL</b>分析,比查看日志+看EXPLAIN適合中小團隊

    MySQL 慢 SQL 排查這件事,NineData 社區VS DBeaver/ Navicat 技術分析

    DBeaver Community 和 Navicat Premium Lite 都是很有價值的客戶端工具,在單條 SQL查詢和驗證上,依然是 DBA 最順手的入口。 但 NineData
    的頭像 發表于 03-17 11:53 ?98次閱讀
    MySQL 慢 <b class='flag-5'>SQL</b> 排查這件事,NineData 社區VS DBeaver/ Navicat 技術分析

    首屆中國NVIDIA DGX Spark黑客松大賽開啟報名

    倒計時啟動!首屆中國 NVIDIA DGX Spark 黑客松(Hackathon)將于 3 月 13 日 - 3 月 28 日正式開啟報名!本屆賽事以“算力破局、AI 落地”為核心導向,依托
    的頭像 發表于 03-14 16:39 ?2654次閱讀

    如何在DGX Spark上運行NVIDIA Omniverse

    首先感謝 Vigor 同學第一時間的分享,以下是具體如何在 DGX Spark 上運行 Omniverse 的方法。
    的頭像 發表于 12-17 10:13 ?856次閱讀
    如何在DGX <b class='flag-5'>Spark</b>上運行NVIDIA Omniverse

    NVIDIA DGX Spark快速入門指南

    需要選擇訪問系統的方式,并運行首次設置實用程序來配置所有內容。設置完成后,可以根據喜好選擇不同的方式訪問 DGX Spark。
    的頭像 發表于 11-17 14:11 ?6778次閱讀
    NVIDIA DGX <b class='flag-5'>Spark</b>快速入門指南

    不用編程不用聯網,實現倍福(BECKHOFF)PLC對接SQL數據庫,上報和查詢數據的案例

    ?IGT-DSER智能網關模塊,支持各種PLC、智能儀表、遠程IO與數據庫之間雙向通訊,既可以讀取設備的數據上報到SQL數據庫,也可以從數據庫查詢數據后寫入到設備;數據庫軟件支持MySQL
    發表于 10-10 11:14

    數據庫慢查詢分析與SQL優化實戰技巧

    今天,我將分享我在處理數千次數據庫性能問題中積累的實戰經驗,幫助你系統掌握慢查詢分析與SQL優化的核心技巧。無論你是剛入門的運維新手,還是有一定經驗的工程師,這篇文章都將為你提供實用的解決方案。
    的頭像 發表于 09-08 09:34 ?1170次閱讀

    SQL 通用數據類型

    SQL 通用數據類型 數據庫表中的每個列都要求有名稱和數據類型。Each column in a database table is required to have a name and a
    的頭像 發表于 08-18 09:46 ?803次閱讀

    產品詳情查詢API接口

    ? 在現代電子商務和軟件開發中,產品詳情查詢API接口扮演著至關重要的角色。它允許開發者通過編程方式從遠程服務器獲取產品的詳細信息,如名稱、價格、描述和庫存狀態等。這種接口通常基于RESTful架構
    的頭像 發表于 07-24 14:39 ?671次閱讀
    產品詳情<b class='flag-5'>查詢</b>API接口

    使用NVIDIA GPU加速Apache Spark中Parquet數據掃描

    方式組織數據,這使得 Parquet 在查詢時僅讀取所需的列,而無需掃描整行數據,即可實現高性能的查詢和分析。高效的數據布局使 Parquet 在現代分析生態系統中成為了受歡迎的選擇,尤其是在 Apache
    的頭像 發表于 07-23 10:52 ?1180次閱讀
    使用NVIDIA GPU加速Apache <b class='flag-5'>Spark</b>中Parquet數據掃描