专业网站建设品牌,十四年专业建站经验,服务6000+客户--广州京杭网络
免费热线:400-683-0016      微信咨询  |  联系我们

SQL之外部数据源如何成为在企业开发中的一把利器_数据库

当前位置:网站建设 > 技术支持
资料来源:网络整理       时间:2023/3/5 21:10:08       共计:3567 浏览

SQL之外部数据源如何成为在企业开发中的一把利器?

一、简介#

1.1 多数据源支持#

Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。

CSV

JSON

Parquet

ORC

JDBC/ODBC connections

Plain-text files

注:以下所有测试文件均可从本仓库的resources 目录进行下载

1.2 读数据格式#

所有读取 API 遵循以下调用格式:

Copy

// 格式

DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例

spark.read.format("csv")

.option("mode", "FAILFAST") // 读取模式

.option("inferSchema", "true") // 是否自动推断 schema

.option("path", "path/to/file(s)") // 文件路径

.schema(someSchema) // 使用预定义的 schema

.load()

读取模式有以下三种可选项:

读模式 描述

permissive 当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中

dropMalformed 删除格式不正确的行

failFast 遇到格式不正确的数据时立即失败

1.3 写数据格式#

Copy

// 格式

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例

dataframe.write.format("csv")

.option("mode", "OVERWRITE") //写模式

.option("dateFormat", "yyyy-MM-dd") //日期格式

.option("path", "path/to/file(s)")

.save()

写数据模式有以下四种可选项:

Scala/Java 描述

SaveMode.ErrorIfExists 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式

SaveMode.Append 数据以追加的方式写入

SaveMode.Overwrite 数据以覆盖的方式写入

SaveMode.Ignore 如果给定的路径已经存在文件,则不做任何操作

二、CSV#

CSV 是一种常见的文本文件格式,其中每一行表示一条记录,记录中的每个字段用逗号分隔。

2.1 读取CSV文件#

自动推断类型读取读取示例:

Copy

spark.read.format("csv")

.option("header", "false") // 文件中的第一行是否为列的名称

.option("mode", "FAILFAST") // 是否快速失败

.option("inferSchema", "true") // 是否自动推断 schema

.load("/usr/file/csv/dept.csv")

.show()

使用预定义类型:

Copy

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}

//预定义数据格式

val myManualSchema = new StructType(Array(

StructField("deptno", LongType, nullable = false),

StructField("dname", StringType,nullable = true),

StructField("loc", StringType,nullable = true)

))

spark.read.format("csv")

.option("mode", "FAILFAST")

.schema(myManualSchema)

.load("/usr/file/csv/dept.csv")

.show()

2.2 写入CSV文件#

Copy

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具体的分隔符:

Copy

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

2.3 可选配置#

为节省主文篇幅,所有读写配置项见文末 9.1 小节。三、JSON#

3.1 读取JSON文件#

Copy

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默认不支持一条数据记录跨越多行 (如下),可以通过配置 multiLine 为 true 来进行更改,其默认值为 false。

Copy

// 默认支持单行

{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//默认不支持多行

{

"DEPTNO": 10,

"DNAME": "ACCOUNTING",

"LOC": "NEW YORK"

}

3.2 写入JSON文件#

Copy

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3 可选配置#

为节省主文篇幅,所有读写配置项见文末 9.2 小节。

四、Parquet#

Parquet 是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是 Spark 是默认的文件格式。

4.1 读取Parquet文件#

Copy

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2 写入Parquet文件#

Copy

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3 可选配置#

Parquet 文件有着自己的存储规则,因此其可选配置项比较少,常用的有如下两个:

读写操作 配置项 可选值 默认值 描述

Write compression or codec None,

uncompressed,

bzip2,

deflate, gzip,

lz4, or snappy None 压缩文件格式

Read mergeSchema true, false 取决于配置项 spark.sql.parquet.mergeSchema

五、ORC#

ORC 是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中常用的文件格式。

5.1 读取ORC文件#

Copy

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2 写入ORC文件#

Copy

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")

六、SQL Databases#

Spark 同样支持与传统的关系型数据库进行数据读写。但是 Spark 程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的 jars 目录中。下面示例使用的是 Mysql 数据库,使用前需要将对应的 mysql-connector-java-x.x.x.jar 上传到 jars 目录下。

6.1 读取数据#

读取全表数据示例如下,这里的 help_keyword 是 mysql 内置的字典表,只有 help_keyword_id 和 name 两个字段。

Copy

spark.read

.format("jdbc")

.option("driver", "com.mysql.jdbc.Driver") //驱动

.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址

.option("dbtable", "help_keyword") //表名

.option("user", "root").option("password","root").load().show(10)

从查询结果读取数据:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""

spark.read.format("jdbc")

.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")

.option("driver", "com.mysql.jdbc.Driver")

.option("user", "root").option("password", "root")

.option("dbtable", pushDownQuery)

.load().show()

//输出

+---------------+-----------+

|help_keyword_id| name|

+---------------+-----------+

| 0| <>|

| 1| ACTION|

| 2| ADD|

| 3|AES_DECRYPT|

| 4|AES_ENCRYPT|

| 5| AFTER|

| 6| AGAINST|

| 7| AGGREGATE|

| 8| ALGORITHM|

| 9| ALL|

| 10| ALTER|

| 11| ANALYSE|

| 12| ANALYZE|

| 13| AND|

| 14| ARCHIVE|

| 15| AREA|

| 16| AS|

| 17| ASBINARY|

| 18| ASC|

| 1

七、Text#

Text 文件在读写性能方面并没有任何优势,且不能表达明确的数据结构,所以其使用的比较少,读写操作如下:

7.1 读取Text数据#

Copy

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2 写入Text数据#

Copy

df.write.text("/tmp/spark/txt/dept")

八、数据读写高级特性#

8.1 并行读#

多个 Executors 不能同时读取同一个文件,但它们可以同时读取不同的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每一个都将成为 DataFrame 中的一个分区,并由可用的 Executors 并行读取。

8.2 并行写#

写入的文件或数据的数量取决于写入数据时 DataFrame 拥有的分区数量。默认情况下,每个数据分区写一个文件。

8.3 分区写入#

分区和分桶这两个概念和 Hive 中分区表和分桶表是一致的。都是将数据按照一定规则进行拆分存储。需要注意的是 partitionBy 指定的分区和 RDD 中分区不是一个概念:这里的分区表现为输出目录的子目录,数据分别存储在对应的子目录中。

Copy

val df = spark.read.format("json").load("/usr/file/json/emp.json")

df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

输出结果如下:可以看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。

8.3 分桶写入#

分桶写入就是将数据按照指定的列和桶数进行散列,目前分桶写入只支持保存为表,实际上这就是 Hive 的分桶表。

val numberBuckets = 10

val columnToBucketBy = "empno"

df.write.format("parquet").mode("overwrite")

.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

.......

具体介绍来源于https://www.cnblogs.com/heibaiying/p/11347390.html

版权说明:
本网站凡注明“广州京杭 原创”的皆为本站原创文章,如需转载请注明出处!
本网转载皆注明出处,遵循行业规范,如发现作品内容版权或其它问题的,请与我们联系处理!
欢迎扫描右侧微信二维码与我们联系。
·上一条:什么是latin1字符集_数据库 | ·下一条:mysql的访问端口是什么_数据库

Copyright © 广州京杭网络科技有限公司 2005-2025 版权所有    粤ICP备16019765号 

广州京杭网络科技有限公司 版权所有