前期准备
- 注册Azure应用并创建应用的客户端密码
- 客户端ID
- 租户ID
- 客户端密码
- 创建存储账号,若勾选层次结构则为ADLS,否则为Blob,并给Azure应用授予Blob参与者角色
- 存储账号名称
- 容器名称
- 存储账号访问密钥
- 创建密钥保管库,并在密钥保管库里新建一个机密,值为Azure应用的客户端密码
- 机密名称
- 密钥保管库属性里的vault url
- 密钥保管库属性里的resource id
- 创建Azure Databricks工作区,设置工作区的secret scope,并记录secret scope name
- secret scope setting:https://
#secrets/createScope - secret scope name
- secret scope setting:https://
在Azure Databricks中挂载ADLS
代码格式:
# python
######################################################################################
# Set the configurations. Here's what you need:
## 1.) Client ID (a.k.a Application ID)
## 2.) Client Secret (a.k.a. Application Secret)
## 3.) Directory ID
## 4.) File System Name
## 5.) Storage Account Name
## 6.) Mount Name
######################################################################################
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<client-id>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"}
######################################################################################
# Optionally, you can add <directory-name> to the source URI of your mount point.
######################################################################################
dbutils.fs.mount(
source = "abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/",
mount_point = "/mnt/<mount-name>",
extra_configs = configs)
参数注释:
<Client-id>
:Azure应用的客户端ID<scope-name>
:Azure Databricks的Secret Scope name<key-name-for-service-credential>
:Azure密钥保管库里的机密名称<directory-id>
:租户ID<mount-name>
:DBFS path,表示Data Lake Store或其中的一个Folder在DBFS中装载的位置<file-system-name>
:容器名称<storage-account-name>
:存储账号名称
示例代码:
# python
######################################################################################
# Set the configurations. Here's what you need:
## 1.) Client ID (a.k.a Application ID)
## 2.) Client Secret (a.k.a. Application Secret)
## 3.) Directory ID
## 4.) File System Name
## 5.) Storage Account Name
## 6.) Mount Name
######################################################################################
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "72333195dd7-b3b9-4c9f23-92d2-5eb212393504aba",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "azureappblobsecretscopetest", key = "kvdatabrickstest"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/c3213dcdf2123e3-b2ca-40ea-bd83-1e6b704f52108/oauth2/token"}
######################################################################################
# Optionally, you can add <directory-name> to the source URI of your mount point.
######################################################################################
dbutils.fs.mount(
source = "abfss://databricks@storeaccountnames.dfs.core.windows.net/",
mount_point = "/mnt/databricks",
extra_configs = configs)
在Azure Databricks中读写ADLS
建立目标表
%sql
create schema if not exists M2D_PRA;
create table if not exists M2D_PRA.kpi_location(
id int,
warehouseCode int,
warehouse string,
value int,
created string
)
using delta
location '/mnt/databricks/delta_table/'
查询目标表的路径信息
%sql
describe detail M2D_PRA.kpi_location
读取Blob中的csv文件数据
# 读取Blob中的csv文件数据 - python
csvData = spark.read.format('csv').options(header='true', inferSchema='true').load("/mnt/databricks/kpi_location.csv")
display(csvData)
csvData.printSchema()
将数据写入到目标表中
# 将数据写入到目标表中 - python
csvData.write.format('delta').mode('overwrite').save('/mnt/databricks/delta_table/')
查询目标表中的数据
%sql
-- 查询目标表中的数据
select * from M2D_PRA.kpi_location
转换成Pandas的DataFrame并导出到Blob
pdData = csvData.toPandas()
pdData.to_csv('/dbfs/mnt/databricks/kpi_location_pandas_export.csv',index=False)
Pandas与PySpark的DataFrame转换
- Pandas转PySpark
- df = spark.createDataFrame(pd_df)
- PySpark转Pandas
- df = sp_df.toPandas()