3.8 Spark 与 R 语言
3.8.1 sparklyr
Spark 依赖特定版本的 Java、Hadoop,三者之间的版本应该要相融。
在 MacOS 上配置 Java 环境,注意 Spark 仅支持 Java 8 至 11,所以安装指定版本的 Java 开发环境
# 安装 openjdk 11
brew install openjdk@11
# 全局设置 JDK 11
sudo ln -sfn /usr/local/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk
# Java 11 JDK 添加到 .zshrc
export CPPFLAGS="-I/usr/local/opt/openjdk@11/include"
export PATH="/usr/local/opt/openjdk@11/bin:$PATH"
配置 R 环境,让 R 能够识别 Java 环境,再安装 rJava 包
# 配置
sudo R CMD javareconf
# 系统软件依赖
brew install pcre2
# 安装 rJava
Rscript -e 'install.packages("rJava", type="source")'
最后安装 sparklyr 包,以及 Spark 环境,可以借助 spark_install()
安装 Spark,比如下面 Spark 3.0 连同 hadoop 2.7 一起安装。
install.packages('sparklyr')
::spark_install(version = '3.0', hadoop_version = '2.7') sparklyr
也可以先手动下载 Spark 软件环境,建议选择就近镜像站点下载,比如在北京选择清华站点
https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz,此环境自带 R 和 Python 接口。为了供 sparklyr 调用,先设置 SPARK_HOME
环境变量指向 Spark 安装位置,再连接单机版 Spark。
# 排错 https://github.com/sparklyr/sparklyr/issues/2827
options(sparklyr.log.console = FALSE)
# 连接 Spark
library(sparklyr)
library(ggplot2)
<- spark_connect(
sc master = "local",
# config = list(sparklyr.gateway.address = "127.0.0.1"),
spark_home = Sys.getenv("SPARK_HOME")
)# diamonds 数据集导入 Spark
<- copy_to(sc, ggplot2::diamonds, "diamonds") diamonds_tbl
做数据的聚合统计,有两种方式。一种是使用用 R 包 dplyr 提供的数据操作语法,下面以按 cut 分组统计钻石的数量为例,说明 dplyr 提供的数据操作方式。
library(dplyr)
# 列出数据源下所有的表 tbls
src_tbls(sc)
<- diamonds_tbl %>%
diamonds_tbl group_by(cut) %>%
summarise(cnt = n()) %>%
collect
另一种是使用结构化查询语言 SQL,这自不必说,大多数情况下,使用和一般的 SQL 没什么两样。
library(DBI)
<- dbGetQuery(sc, "SELECT count(*) as cnt, cut FROM diamonds GROUP BY cut")
diamonds_preview diamonds_preview
## cnt cut
## 1 21551 Ideal
## 2 13791 Premium
## 3 4906 Good
## 4 1610 Fair
## 5 12082 Very Good
# SQL 中的 AVG 和 R 中的 mean 函数是类似的
<- dbGetQuery(sc, "SELECT AVG(price) as mean_price, cut FROM diamonds GROUP BY cut")
diamonds_price diamonds_price
## mean_price cut
## 1 3457.542 Ideal
## 2 4584.258 Premium
## 3 3928.864 Good
## 4 4358.758 Fair
## 5 3981.760 Very Good
library(ggplot2)
library(data.table)
<- as.data.table(diamonds)
diamonds mean_price = mean(price)), by = .(cut)] diamonds[,.(
## cut mean_price
## 1: Ideal 3457.542
## 2: Premium 4584.258
## 3: Good 3928.864
## 4: Very Good 3981.760
## 5: Fair 4358.758
将结果数据用 ggplot2 呈现出来
ggplot(diamonds_preview, aes(cut, cnt)) +
geom_col() +
theme_minimal()
diamonds 数据集总共 53940 条数据,下面用 BUCKET 分桶抽样,将原数据随机分成 1000 个桶,取其中的一个桶,由于是随机分桶,所以每次的结果都不一样,解释详见https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-sampling.html
<- dbGetQuery(sc, "SELECT * FROM diamonds TABLESAMPLE (BUCKET 1 OUT OF 1000) LIMIT 6")
diamonds_sample diamonds_sample
## carat cut color clarity depth table price x y z
## 1 0.60 Ideal F VVS2 62.0 55 2822 5.37 5.40 3.34
## 2 0.77 Ideal E SI2 60.7 55 2834 6.01 5.95 3.63
## 3 0.70 Very Good D VS2 63.1 56 2985 5.62 5.69 3.57
## 4 0.76 Very Good E VS2 61.0 58 3111 5.88 5.93 3.60
## 5 1.00 Good J VS2 62.0 61 3835 6.36 6.45 3.97
## 6 1.01 Good H SI1 64.0 58 4191 6.37 6.31 4.06
将抽样的结果用窗口函数 RANK()
排序,详见 https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html
窗口函数 https://www.cnblogs.com/ZackSun/p/9713435.html
<- dbGetQuery(sc, "
diamonds_rank SELECT cut, price, RANK() OVER (PARTITION BY cut ORDER BY price) AS rank
FROM diamonds TABLESAMPLE (BUCKET 1 OUT OF 1000)
LIMIT 6
")
diamonds_rank
## cut price rank
## 1 Fair 4480 1
## 2 Fair 4600 2
## 3 Good 605 1
## 4 Good 3139 2
## 5 Good 3465 3
## 6 Good 4588 4
LATERAL VIEW 把一列拆成多行
https://liam.page/2020/03/09/LATERAL-VIEW-in-Hive-SQL/ https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-lateral-view.html
创建数据集
# 先删除存在的表 person
dbGetQuery(sc, "DROP TABLE IF EXISTS person")
# 创建表 person
dbGetQuery(sc, "CREATE TABLE IF NOT EXISTS person (id INT, name STRING, age INT, class INT, address STRING)")
# 插入数据到表 person
dbGetQuery(sc, "
INSERT INTO person VALUES
(100, 'John', 30, 1, 'Street 1'),
(200, 'Mary', NULL, 1, 'Street 2'),
(300, 'Mike', 80, 3, 'Street 3'),
(400, 'Dan', 50, 4, 'Street 4')
")
查看数据集
dbGetQuery(sc, "SELECT * FROM person")
## id name age class address
## 1 100 John 30 1 Street 1
## 2 200 Mary NA 1 Street 2
## 3 300 Mike 80 3 Street 3
## 4 400 Dan 50 4 Street 4
行列转换 https://www.cnblogs.com/kimbo/p/6208973.html,LATERAL VIEW 展开
dbGetQuery(sc,"
SELECT * FROM person
LATERAL VIEW EXPLODE(ARRAY(30, 60)) tabelName AS c_age
LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age
LIMIT 6
")
## id name age class address c_age d_age
## 1 100 John 30 1 Street 1 30 40
## 2 100 John 30 1 Street 1 30 80
## 3 100 John 30 1 Street 1 60 40
## 4 100 John 30 1 Street 1 60 80
## 5 200 Mary NA 1 Street 2 30 40
## 6 200 Mary NA 1 Street 2 30 80
日期相关的函数 https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions
# 今天
dbGetQuery(sc, "select current_date")
## current_date()
## 1 2023-05-25
# 昨天
dbGetQuery(sc, "select date_sub(current_date, 1)")
## date_sub(current_date(), 1)
## 1 2023-05-24
# 本月最后一天 current_date 所属月份的最后一天
dbGetQuery(sc, "select last_day(current_date)")
## last_day(current_date())
## 1 2023-05-31
# 星期几
dbGetQuery(sc, "select dayofweek(current_date)")
## dayofweek(current_date())
## 1 5
最后,使用完记得关闭 Spark 连接
spark_disconnect(sc)
3.8.2 SparkR
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/opt/spark/spark-3.0.1-bin-hadoop2.7")
}library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
SparkR 要求 Java 版本满足:大于等于8,而小于12,本地 MacOS 安装高版本,比如 oracle-jdk 16.0.1 会报不兼容的错误。
Spark package found in SPARK_HOME: /opt/spark/spark-3.1.1-bin-hadoop3.2
Error in checkJavaVersion() :
Java version, greater than or equal to 8 and less than 12, is required for this package; found version: 16.0.1
sparkConfig
有哪些参数可以传递
Property Name | Property group | spark-submit equivalent |
---|---|---|
spark.master |
Application Properties | --master |
spark.kerberos.keytab |
Application Properties | --keytab |
spark.kerberos.principal |
Application Properties | --principal |
spark.driver.memory |
Application Properties | --driver-memory |
spark.driver.extraClassPath |
Runtime Environment | --driver-class-path |
spark.driver.extraJavaOptions |
Runtime Environment | --driver-java-options |
spark.driver.extraLibraryPath |
Runtime Environment | --driver-library-path |
将 data.frame 转化为 SparkDataFrame
<- as.DataFrame(faithful) faithful_sdf
SparkDataFrame
head(faithful_sdf)
查看结构
str(faithful_sdf)