注册服务连接

本主题介绍如何向 Snowflake 或第三方服务(例如 Apache Spark™)注册服务连接凭据。Snowflake Open Catalog 管理员负责注册服务连接。

本主题中的示例代码展示如何在 Spark 中注册服务连接,该示例代码位于 PySpark 中。

先决条件

在注册服务连接之前,需要先配置服务连接。有关说明,请参阅 配置服务连接

注册服务连接

以下示例代码用于注册单个服务连接。

注意

也可以注册多个服务连接;请参阅 示例 2:注册两个服务连接

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,<maven_coordinate>') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','<client_id>:<client_secret>') \
    .config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:<principal_role_name>') \
    .getOrCreate()
Copy

参数

参数

描述

<catalog_name>

指定要连接的目录的名称。

*重要
<catalog_name> 区分大小写。

<maven_coordinate>

指定外部云存储提供商的 maven 坐标:

  • **S3: software.amazon.awssdk:bundle:2.20.160
  • **云存储(来自 Google): org.apache.iceberg:iceberg-gcp-bundle:1.5.2
  • **Azure: org.apache.iceberg:iceberg-azure-bundle:1.5.2

<client_id>

指定服务主体要使用的客户端 ID。

<client_secret>

指定服务主体要使用的客户密钥。

<open_catalog_account_identifier>

指定 Open Catalog 账户的账户标识符。根据账户所在区域和云平台的不同,此标识符可能本身就是账户定位器(例如,xy12345),也可能包含其他分段。有关更多信息,请参阅 使用账户定位器作为标识符

<principal_role_name>

指定授予服务主体的主体角色。

注册跨区域服务连接(仅限 Amazon S3)

以下示例代码用于在以下情况为 True 时注册服务连接:

  • 您的 Open Catalog 账户托管在 Amazon S3 上。

  • 您的外部存储提供商是 Amazon S3。

  • 您的 Open Catalog 账户托管在一个 S3 区域,该区域与包含您的 Apache Tables™ 表的存储桶所在的 S3 区域不同。

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','<client_id>:<client_secret>') \
    .config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
    .config('spark.sql.catalog.opencatalog.client.region','<target_s3_region>') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:<principal_role_name>') \
    .getOrCreate()
Copy

参数

参数

描述

<catalog_name>

指定要连接的目录的名称。

*重要
<catalog_name> 区分大小写。

<client_id>

指定服务主体要使用的客户端 ID。

<client_secret>

指定服务主体要使用的客户密钥。

<open_catalog_account_identifier>

指定 Open Catalog 账户的账户标识符。根据账户所在区域和云平台的不同,此标识符可能本身就是账户定位器(例如,xy12345),也可能包含其他分段。有关更多信息,请参阅 使用账户定位器作为标识符

<target_s3_region>

指定包含 Apache Iceberg 表的 S3 桶所在的区域代码。有关区域代码,请参阅 AWS 服务端点 (https://docs.aws.amazon.com/general/latest/gr/s3.html#s3_region),并参阅表中的区域列。

<principal_role_name>

指定授予服务主体的主体角色。

示例

本节包含在 Spark 中注册服务连接的示例。

示例 1:注册单个服务连接 (S3)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','000000000000000000000000000=:1111111111111111111111111111111111111111111=') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:data_engineer') \
    .getOrCreate()
Copy

示例 2:注册两个服务连接 (S3)

重要

注册多个服务连接时,必须将第一个连接代码中的 opencatalog 实例更改为每个后续连接代码中的唯一文本。例如,在以下代码中,第一个连接的 opencatalog 实例更改为第二个连接的 opencatalog1

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','000000000000000000000000000=:1111111111111111111111111111111111111111111=') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:data_scientist') \
    .config('spark.sql.catalog.opencatalog1', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog1.type', 'rest') \
    .config('spark.sql.catalog.opencatalog1.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog1.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog1.credential','222222222222222222222222222=:3333333333333333333333333333333333333333333=') \
    .config('spark.sql.catalog.opencatalog1.warehouse','Catalog2') \
    .config('spark.sql.catalog.opencatalog1.scope','PRINCIPAL_ROLE:data_scientist') \
    .getOrCreate()
Copy

示例 3:注册单个服务连接(Google 的云存储)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-gcp-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','000000000000000000000000000=:1111111111111111111111111111111111111111111=') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:data_engineer') \
    .getOrCreate()
Copy

示例 4:注册单个服务连接 (Azure)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.cn/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','000000000000000000000000000=:1111111111111111111111111111111111111111111=') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:data_engineer') \
    .getOrCreate()
Copy

验证与 Open Catalog 的连接

要验证 Spark 是否已连接到 Open Catalog,应列出目录的命名空间。有关更多信息,请参阅 列出命名空间

语言: 中文