HUDI 部署与spark集成
有关hudi 的部署 以及 spark hudi 集成的操作文档
# HUDI 部署与spark集成
*平台部署知识*
有关hudi 的部署 以及 spark hudi 集成的操作文档

# 目录
[TOC]
# 介绍
https://hudi.apache.org/
Apache Hudi(Hadoop Upserts Delete and Incremental)是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。
Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。
Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。
## Hudi特性
- 可插拔索引机制支持快速Upsert/Delete。
- 支持增量拉取表变更以进行处理。
- 支持事务提交及回滚,并发控制。
- 支持Spark、Presto、Trino、Hive、Flink等引擎的SQL读写。
- 自动管理小文件,数据聚簇,压缩,清理。
- 流式摄入,内置CDC源和工具。
- 内置可扩展存储访问的元数据跟踪。
- 向后兼容的方式实现表结构变更的支持。
## 使用场景
**近实时写入**
- 减少碎片化工具的使用。
- CDC 增量导入 RDBMS 数据。
- 限制小文件的大小和数量。
**近实时分析**
- 相对于秒级存储(Druid, OpenTSDB),节省资源。
- 提供分钟级别时效性,支撑更高效的查询。
- Hudi作为lib,非常轻量。
**增量** **pipeline**
- 区分arrivetime和event time处理延迟数据。
- 更短的调度interval减少端到端延迟(小时 -\> 分钟) =\> Incremental Processing。
**增量导出**
- 替代部分Kafka的场景,数据导出到在线服务存储 e.g. ES。
# 编译
Hudi并没有提供打好的包,只提供了源码,因此我们需要在这里下载hudi的源码并进行编译打包,因此我们需要进行源码的下载,下面就是源码的存档仓库。
[https://github.com/apache/hudi/tags](https://github.com/apache/hudi/tags)
需要根据spark的版本来进行不同源码的选择
## 服务器环境准备
### Hudi与spark 对照
| **Hudi** | **Supported Spark 3 version** |
| --- | --- |
| 0.14.x | 3.4.x (default build), 3.3.x, 3.2.x, 3.1.x, 3.0.x |
| --- | --- |
| 0.13.x | 3.3.x (default build), 3.2.x, 3.1.x |
| 0.12.x | 3.3.x (default build), 3.2.x, 3.1.x |
| 0.11.x | 3.2.x (default build, Spark bundle only), 3.1.x |
| 0.10.x | 3.1.x (default build), 3.0.x |
| 0.7.0 - 0.9.0 | 3.0.x |
| 0.6.0 and prior | not supported |
在这里我们找到了hudi与spark的适配版本,首先来查看一下服务器中的spark版本,从下面的结果中可以确定spark的版本为3.0.0,且在官网中有针对0.14支持spark3.0.0的说明,因此hudi的源码应该选择高于0.14的版本:[https://github.com/apache/hudi/archive/refs/tags/release-0.14.0.tar.gz](https://github.com/apache/hudi/archive/refs/tags/release-0.14.0.tar.gz)
### Maven安装
由于我们需要在服务器上针对源码进行打包,因此必须将maven安装在服务器上,用于进行依赖获取以及安装操作,下面就是maven的链接。
[https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz](https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz)
#### 下载maven
```
root@liming-virtual-machine:/opt/package# wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz # 下载 maven
--2023-11-23 16:37:12-- https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9506321 (9.1M) [application/x-gzip]
Saving to: 'apache-maven-3.6.3-bin.tar.gz'
apache-maven-3.6.3-bin.tar.gz 100%[======================================================================================================================================================================\>] 9.07M 3.49MB/s in 2.6s
2023-11-23 16:37:16 (3.49 MB/s) - 'apache-maven-3.6.3-bin.tar.gz' saved [9506321/9506321]
上面的日志就是下载maven操作相关的处理,可以在目录中查看到相关的数据。
```

#### 安装maven
首先对maven进行解压,然后对其设置环境变量就可以啦,操作比较简单,在这里进行省略,下面就是安装好maven之后的效果。
```
root@liming-virtual-machine:/opt/software/apache-maven-3.6.3/bin# mvn -version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /opt/software/apache-maven-3.6.3
Java version: 1.8.0\_202, vendor: Oracle Corporation, runtime: /opt/software/jdk1.8.0\_202/jre
Default locale: zh\_CN, platform encoding: UTF-8
OS name: "linux", version: "5.19.0-32-generic", arch: "amd64", family: "unix"
```
### Hudi编译
#### 下载并解压hudi源码
我们可以在上面找到如何获取到hudi0.14的源码,接下来就是解压之后的hudi项目目录,可以看到这是一个maven目录。

#### Pom中的境内镜像配置
针对一个maven工程,我们需要知道其具有自动下载依赖的特点,但是它会在国外的服务器仓库下载依赖,这会导致其变得很慢,因此在这里直接设置国内的镜像,加快速度。
```xml
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
```
值得注意的是 repository 要放在repositories 标签的下面
#### 在pom中配置hadoop和hive的版本
首先我们需要查询一下当前使用的hadoop和hive的版本,可以看到,Hadoop是3.2.1,hive是3.1.2,记住这两个版本号。

回到pom文件中,开始设置 Hadoop和hive依赖的版本,在打包的时候会自动下载对应版本的API,版本与平台中的软件的版本越贴近越不容易出现错误,因此我们需要在这里手动指定版本号。

#### 开始进行编译
输入命令:`mvn clean package -DskipTests -Dspark3.2 -Dflink1.13 -Dscala-2.12 -Dhadoop.version=3.1.3 -Pflink-bundle-shade-hive3`

出现上面的日志就代表 hudi 处理成功啦,下面直接开始进行hudi的启动
### 启动hudi
打包之后,在源码目录中会有一个 hudi-cli 目录,其中有一个文件叫做 hudi-cli.sh,像下面一样进入就可以啦

# Spark集成 – 写数据
## 装载hudi依赖到sparkShell
在官网中可以查询到相关的操作,可以看到仅仅是执行下面的这样一条命令就可以实现自动获取依赖的效果。
此命令输入之后,sparkShell会启动,同时会联网下载依赖jar

启动之后就可以使用spark访问hudi了,这个依赖jar可以被装载到外界的spark工程中。
### 下载hudi与spark的中间组件
输入下面的命令就可以开始进行 HUDI 组件对于spark的装载了、
```
export SPARK_VERSION=3.0
spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
```
接下来是全部的执行日志
```
root@liming-virtual-machine:/opt/src/hudi-release-0.14.0# export SPARK_VERSION=3.0
root@liming-virtual-machine:/opt/src/hudi-release-0.14.0# spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
23/11/23 18:40:29 WARN Utils: Your hostname, liming-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.0.141 instead (on interface ens33)
23/11/23 18:40:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/software/spark-3.0.0-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.hudi#hudi-spark3.0-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c42f7887-5bd6-4365-b692-2badc4d20f9c;1.0
confs: [default]
found org.apache.hudi#hudi-spark3.0-bundle_2.12;0.14.0 in central
downloading https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.0-bundle_2.12/0.14.0/hudi-spark3.0-bundle_2.12-0.14.0.jar ...
[SUCCESSFUL ] org.apache.hudi#hudi-spark3.0-bundle_2.12;0.14.0!hudi-spark3.0-bundle_2.12.jar (72033ms)
:: resolution report :: resolve 7176ms :: artifacts dl 72038ms
:: modules in use:
org.apache.hudi#hudi-spark3.0-bundle_2.12;0.14.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 1 | 1 | 0 || 1 | 1 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-c42f7887-5bd6-4365-b692-2badc4d20f9c
confs: [default]
1 artifacts copied, 0 already retrieved (100934kB/120ms)
23/11/23 18:41:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://liming141:4040
Spark context available as 'sc' (master = local[*], app id = local-1700736116296).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
```
### 将中间组件装载到外界程序
在刚刚获取jar的时候,应该可以看到这里有一个jar包的链接,是的,将它下载下来就可以装载到项目中了。
[https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.0-bundle\_2.12/0.14.0/hudi-spark3.0-bundle\_2.12-0.14.0.jar](https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.0-bundle_2.12/0.14.0/hudi-spark3.0-bundle_2.12-0.14.0.jar)

### 开始进行spark编程
```scala
package run
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object MyClass {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//关闭日志
session.sparkContext.setLogLevel("Error")
//创建DataFrame
val insertDF: DataFrame = session.read.json("file:///G:\\DataSet\\jsonDataSet\\test.json")
//将结果保存到hudi中
insertDF.write.format("org.apache.hudi") //或者直接写hudi
//设置主键列名称
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
//当数据主键相同时,对比的字段,保存该字段大的数据
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "time")
//并行度设置,默认1500
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名设置
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Overwrite)
//注意:这里要选择hdfs路径存储,不要加上hdfs://mycluster//dir
//将hdfs 中core-site.xml 、hdfs-site.xml放在resource目录下,直接写/dir路径即可,否则会报错:java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/20220509164730/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/20220509164730
.save("/hudi_data/person_infos")
}
}
```
### 开始运行
```
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/11/23 18:48:12 INFO SparkContext: Running Spark version 3.0.0
23/11/23 18:48:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/23 18:48:12 INFO ResourceUtils: ==============================================================
23/11/23 18:48:12 INFO ResourceUtils: Resources for spark.driver:
23/11/23 18:48:12 INFO ResourceUtils: ==============================================================
23/11/23 18:48:12 INFO SparkContext: Submitted application: insertDataToHudi
23/11/23 18:48:12 INFO SecurityManager: Changing view acls to: zhao
23/11/23 18:48:12 INFO SecurityManager: Changing modify acls to: zhao
23/11/23 18:48:12 INFO SecurityManager: Changing view acls groups to:
23/11/23 18:48:12 INFO SecurityManager: Changing modify acls groups to:
23/11/23 18:48:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(zhao); groups with view permissions: Set(); users with modify permissions: Set(zhao); groups with modify permissions: Set()
23/11/23 18:48:13 INFO Utils: Successfully started service 'sparkDriver' on port 54357.
23/11/23 18:48:13 INFO SparkEnv: Registering MapOutputTracker
23/11/23 18:48:13 INFO SparkEnv: Registering BlockManagerMaster
23/11/23 18:48:13 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/11/23 18:48:13 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/11/23 18:48:13 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/11/23 18:48:13 INFO DiskBlockManager: Created local directory at C:\Users\zhao\AppData\Local\Temp\blockmgr-8ccb32ff-b721-4b06-a89f-167c85b8a9a3
23/11/23 18:48:13 INFO MemoryStore: MemoryStore started with capacity 4.1 GiB
23/11/23 18:48:13 INFO SparkEnv: Registering OutputCommitCoordinator
23/11/23 18:48:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/11/23 18:48:13 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://LimingDeskTop:4040
23/11/23 18:48:13 INFO Executor: Starting executor ID driver on host LimingDeskTop
23/11/23 18:48:13 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54384.
23/11/23 18:48:13 INFO NettyBlockTransferService: Server created on LimingDeskTop:54384
23/11/23 18:48:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/11/23 18:48:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, LimingDeskTop, 54384, None)
23/11/23 18:48:13 INFO BlockManagerMasterEndpoint: Registering block manager LimingDeskTop:54384 with 4.1 GiB RAM, BlockManagerId(driver, LimingDeskTop, 54384, None)
23/11/23 18:48:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, LimingDeskTop, 54384, None)
23/11/23 18:48:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, LimingDeskTop, 54384, None)
进程已结束,退出代码0
```
### 踩坑记录
发生如下错误需要配置 dfs.client.block.write.replace-datanode-on-failure.policy

```xml
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>
```
### 小例子

# Spark集成 – 读数据
## 在hdfs中查看spark向hudi写的数据
在这里有一个路径,其实就是对应的 hdfs 中的路径,在这里存储的就类似一张表,所以在读取的时候也是需要这个路径

从Hudi表中读取数据,同样采用SparkSQL外部数据源加载数据方式,指定format数据源和相关参数options,命令如下:
val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "/_/_/_/_")
其中指定Hudi表数据存储路径即可,路径可以使用正则

## 开始进行spark编程
```scala
package run
import org.apache.spark.sql.SparkSession
object MyClass {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//关闭日志
session.sparkContext.setLogLevel("Error")
// 在这里与写数据操作几乎一致 指定 hudi 的源
session.read.format("org.apache.hudi")
// 指定需要被加载的表 TODO 不要忘记hdfs的配置文件
.load("/hudi_data/person_infos")
// 查看表
.show()
}
}
```
## 开始运行
```
+-------------------+--------------------+------------------+----------------------+--------------------+---+---+-----+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|age| id| name| time|
+-------------------+--------------------+------------------+----------------------+--------------------+---+---+-----+-------------------+
| 20231123191531672|20231123191531672...| 7| |703bc44a-8ab6-40c...| 32| 7| Yang|2023-05-06 08:00:06|
| 20231123191531672|20231123191531672...| 1| |703bc44a-8ab6-40c...| 25| 1| Li|2023-05-06 08:00:06|
| 20231123191531672|20231123191531672...| 3| |703bc44a-8ab6-40c...| 29| 3|Zhang|2023-05-06 08:00:06|
| 20231123191531672|20231123191531672...| 5| |703bc44a-8ab6-40c...| 30| 5| Zhao|2023-05-06 08:00:06|
| 20231123191531672|20231123191531672...| 2| |703bc44a-8ab6-40c...| 31| 2| Wang|2023-05-06 08:00:06|
| 20231123191531672|20231123191531672...| 4| |703bc44a-8ab6-40c...| 28| 4| Liu|2023-05-06 08:00:06|
| 20231123191531672|20231123191531672...| 6| |703bc44a-8ab6-40c...| 27| 6| Chen|2023-05-06 08:00:06|
+-------------------+--------------------+------------------+----------------------+--------------------+---+---+-----+-------------------+
```
------
***操作记录***
作者:[root](http://www.lingyuzhao.top/index.html?search=1 "root")
操作时间:2023-12-08 18:46:11 星期五
事件描述备注:保存/发布
[](如果不需要此记录可以手动删除,每次保存都会自动的追加记录)
------
***操作记录***
作者:[root](http://www.lingyuzhao.top//index.html?search=1 "root")
操作时间:2023-12-21 20:04:06 星期四
事件描述备注:保存/发布
[](如果不需要此记录可以手动删除,每次保存都会自动的追加记录)