人生没有彩排
每一天都是现场直播

Azure Databricks之ADLS的挂载与读写

前期准备

  • 注册Azure应用并创建应用的客户端密码
    • 客户端ID
    • 租户ID
    • 客户端密码
  • 创建存储账号,若勾选层次结构则为ADLS,否则为Blob,并给Azure应用授予Blob参与者角色
    • 存储账号名称
    • 容器名称
    • 存储账号访问密钥
  • 创建密钥保管库,并在密钥保管库里新建一个机密,值为Azure应用的客户端密码
    • 机密名称
    • 密钥保管库属性里的vault url
    • 密钥保管库属性里的resource id
  • 创建Azure Databricks工作区,设置工作区的secret scope,并记录secret scope name
    • secret scope setting:https://#secrets/createScope
    • secret scope name

在Azure Databricks中挂载ADLS

代码格式:

# python
######################################################################################
# Set the configurations. Here's what you need:
## 1.) Client ID (a.k.a Application ID)
## 2.) Client Secret (a.k.a. Application Secret)
## 3.) Directory ID
## 4.) File System Name
## 5.) Storage Account Name
## 6.) Mount Name
######################################################################################
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<client-id>",
           "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"),
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"}

######################################################################################
# Optionally, you can add <directory-name> to the source URI of your mount point.
######################################################################################
dbutils.fs.mount(
  source = "abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/",
  mount_point = "/mnt/<mount-name>",
  extra_configs = configs)

参数注释:

  • <Client-id>:Azure应用的客户端ID
  • <scope-name>:Azure Databricks的Secret Scope name
  • <key-name-for-service-credential>:Azure密钥保管库里的机密名称
  • <directory-id>:租户ID
  • <mount-name> :DBFS path,表示Data Lake Store或其中的一个Folder在DBFS中装载的位置
  • <file-system-name>:容器名称
  • <storage-account-name>:存储账号名称

示例代码:

# python
######################################################################################
# Set the configurations. Here's what you need:
## 1.) Client ID (a.k.a Application ID)
## 2.) Client Secret (a.k.a. Application Secret)
## 3.) Directory ID
## 4.) File System Name
## 5.) Storage Account Name
## 6.) Mount Name
######################################################################################
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "72333195dd7-b3b9-4c9f23-92d2-5eb212393504aba",
           "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "azureappblobsecretscopetest", key = "kvdatabrickstest"),
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/c3213dcdf2123e3-b2ca-40ea-bd83-1e6b704f52108/oauth2/token"}

######################################################################################
# Optionally, you can add <directory-name> to the source URI of your mount point.
######################################################################################
dbutils.fs.mount(
  source = "abfss://databricks@storeaccountnames.dfs.core.windows.net/",
  mount_point = "/mnt/databricks",
  extra_configs = configs)

在Azure Databricks中读写ADLS

建立目标表

%sql

create schema if not exists M2D_PRA;

create table if not exists M2D_PRA.kpi_location(
    id int,
    warehouseCode int,
    warehouse string,
    value int,
    created string
)
using delta
location '/mnt/databricks/delta_table/'

查询目标表的路径信息

%sql
describe detail M2D_PRA.kpi_location

读取Blob中的csv文件数据

# 读取Blob中的csv文件数据 - python
csvData = spark.read.format('csv').options(header='true', inferSchema='true').load("/mnt/databricks/kpi_location.csv")
display(csvData)
csvData.printSchema()

将数据写入到目标表中

# 将数据写入到目标表中 - python
csvData.write.format('delta').mode('overwrite').save('/mnt/databricks/delta_table/')

查询目标表中的数据

%sql
-- 查询目标表中的数据
select * from M2D_PRA.kpi_location

转换成Pandas的DataFrame并导出到Blob

pdData = csvData.toPandas()
pdData.to_csv('/dbfs/mnt/databricks/kpi_location_pandas_export.csv',index=False)

Pandas与PySpark的DataFrame转换

  • Pandas转PySpark
    • df = spark.createDataFrame(pd_df)
  • PySpark转Pandas
    • df = sp_df.toPandas()
赞(0) 打赏
版权声明:本文为夕枫的原创文章,著作权归作者所有,未经允许不得转载
文章名称:《Azure Databricks之ADLS的挂载与读写》
文章链接:https://www.ximaple.com/posts/595.html
订阅评论
提醒
guest
0 评论
最新
最久 最赞
内联反馈
查看所有评论

觉得文章有用的话就支持一下吧~

感谢您的打赏支持,我将持续输出有价值的内容!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册

Operation don't support