使用 Python SDK 处理 Dataverse 数据

本文演示了使用 SDK 处理 Dataverse 数据和元数据的示例代码。 在继续之前,请务必阅读 入门

基本操作

下面是一些操作帐户表的示例代码。

from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient

# Replace <myorg> with the name of a valid environment.
base_url = "https://<myorg>.crm.dynamics.com"
client = DataverseClient(base_url=base_url, credential=InteractiveBrowserCredential())

# Create a record
account_id = client.records.create("account", {"name": "Contoso Ltd"})

# Read a record
account = client.records.retrieve("account", account_id)
print(account["name"])

# Read with expand fetches a related record in the same HTTP request
account = client.records.retrieve(
    "account", account_id,
    select=["name"],
    expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))

# Update a record
client.records.update("account", account_id, {"telephone1": "555-0199"})

# Delete a record
client.records.delete("account", account_id)

上下文管理器

上下文管理器负责自动清理和 HTTP 连接池管理。 使用以下语法来利用上下文管理器。

with DataverseClient("https://<myorg>.crm.dynamics.com", credential) as client:

以下工作代码演示如何使用上下文管理器。

from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient

# Connect to Dataverse
credential = InteractiveBrowserCredential()

with DataverseClient("https://<myorg>.crm.dynamics.com", credential) as client:

    # Create a contact
    contact_id = client.records.create("contact", {"firstname": "John", "lastname": "Doe"})

    # Read the contact back
    contact = client.records.retrieve("contact", contact_id, select=["firstname", "lastname"])
    print(f"Created: {contact['firstname']} {contact['lastname']}")

    # Clean up
    client.records.delete("contact", contact_id)

# Session closed, caches cleared automatically

批量操作

下面是执行批量操作的几个示例。

# Bulk create
payloads = [
    {"name": "Company A"},
    {"name": "Company B"},
    {"name": "Company C"}
]
ids = client.records.create("account", payloads)

# Bulk update (broadcast same change to all)
client.records.update("account", ids, {"industry": "Technology"})

# Bulk delete
client.records.delete("account", ids, use_bulk_delete=True)

以下示例创建多个帐户。 传递有效负载列表到 create(logical_name, payloads) 以调用集合绑定的 Microsoft.Dynamics.CRM.CreateMultiple 操作。 该方法返回 list[str] 已创建的记录 ID。

# Bulk create accounts (returns list of GUIDs)
payloads = [
    {"name": "Contoso"},
    {"name": "Fabrikam"},
    {"name": "Northwind"},
]
ids = client.records.create("account", payloads)
assert isinstance(ids, list) and all(isinstance(x, str) for x in ids)
print({"created_ids": ids})

有关批量操作的详细信息:

  • 返回 None (与单个更新相同)以保持语义一致。
  • 广播与按记录处理取决于 changes 参数是字典还是列表。
  • 构造 UpdateMultiple 操作 目标时,会自动注入主键属性。
  • 如果任何负载数据省略了 @odata.type,SDK 会自动补上它(通过缓存的逻辑名称查找)。
  • 响应仅包含 ID - SDK 返回这些 GUID 字符串。
  • 单记录创建返回包含单个元素的 GUID 列表。
  • 每个实体集中的@odata.type元数据查找被执行一次,结果缓存于内存中。

Upsert(创建和更新)

常见的数据访问序列是首先检查表行是否存在。 如果该行存在,请更新它。 否则,创建该行。 通过使用 Upsert 操作的单个 API 调用,可以提高此序列的效率。

有关详细信息,请参阅 使用 Upsert 创建或更新记录

Important

该表必须在 Dataverse 中为 alternate_key 中使用的列配置了替代键。 通过 Power Apps maker 门户或 Dataverse API 调用来在表元数据中定义备用键。 如果没有配置的备用密钥,Dataverse 将拒绝出现 400 错误的 upsert 请求。

使用 client.records.upsert() 创建或更新由替代键标识的记录。 当键与现有记录匹配时,该方法将更新记录。 否则,它将创建该记录。 单个项使用 PATCH 请求,而多个项则使用 UpsertMultiple 批量操作。

from PowerPlatform.Dataverse.models.upsert import UpsertItem

# Upsert a single record
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001"},
        record={"name": "Contoso Ltd", "telephone1": "555-0100"},
    )
])

# Upsert multiple records (uses UpsertMultiple bulk action)
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001"},
        record={"name": "Contoso Ltd"},
    ),
    UpsertItem(
        alternate_key={"accountnumber": "ACC-002"},
        record={"name": "Fabrikam Inc"},
    ),
])

# Composite alternate key (multiple columns identify the record)
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
        record={"name": "Contoso Ltd"},
    )
])

# Plain dict syntax (no import needed)
client.records.upsert("account", [
    {
        "alternate_key": {"accountnumber": "ACC-001"},
        "record": {"name": "Contoso Ltd"},
    }
])

数据帧

SDK 通过 client.dataframe 命名空间为所有 CRUD 操作提供 pandas 封装。 这些整合程序使用 pandas 的 DataFrame 和 Series API 进行输入和输出。

注释

client.dataframe.get() 已弃用。 使用以下部分所示的 GA 模式。

import pandas as pd
from PowerPlatform.Dataverse.models.filters import col

# Query records as a single DataFrame (GA builder pattern)
df = (client.query.builder("account")
      .select("name", "telephone1")
      .where(col("statecode") == 0)
      .execute()
      .to_dataframe())
print(f"Found {len(df)} accounts")

# Limit results with top for large tables
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()

# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
    {"name": "Contoso", "telephone1": "555-0100"},
    {"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)

# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")

# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": new_accounts["accountid"].iloc[0], "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)

# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])

# SQL query directly to DataFrame (supports JOINs, aggregates, GROUP BY)
df = client.dataframe.sql(
    "SELECT a.name, COUNT(c.contactid) as contacts "
    "FROM account a "
    "JOIN contact c ON a.accountid = c.parentcustomerid "
    "GROUP BY a.name"
)

将文件上传到 Dataverse

以下示例演示如何将名为document.pdf的文件上传到账户记录中名为new_Document文件列。 用于Python的 SDK 会自动处理大于 128 MB 的文件区块。

# Upload a file to a record
client.files.upload(
    "account",
    account_id,
    "new_Document",
    "/path/to/document.pdf",
)

Tip

如果文件列不存在,SDK 会自动创建它。

批处理操作

使用 client.batch 在一个 HTTP 请求中发送多个操作。 批处理命名空间映射到 client.recordsclient.tablesclient.query

# Build a batch request and add operations
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.create("account", [{"name": "Fabrikam"}, {"name": "Woodgrove"}])
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.delete("account", old_id)
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"])  # single record with expand
batch.records.list(                                                # multi-record, single page
    "account",
    filter="statecode eq 0",
    select=["name"],
    orderby=["name asc"],
    top=50,
)

result = batch.execute()
for item in result.responses:
    if item.is_success:
        print(f"[OK] {item.status_code} entity_id={item.entity_id}")
    else:
        print(f"[ERR] {item.status_code}: {item.error_message}")

事务性变更集

变更集中的所有操作要么全部成功,要么全部回滚。

batch = client.batch.new()
with batch.changeset() as cs:
    lead_ref = cs.records.create("lead", {"firstname": "Ada"})
    contact_ref = cs.records.create("contact", {"firstname": "Ada"})
    cs.records.create("account", {
        "name": "Babbage & Co.",
        "[email protected]": lead_ref,
        "[email protected]": contact_ref,
    })
result = batch.execute()
print(f"Created {len(result.entity_ids)} records atomically")

批处理中的表元数据和 SQL 查询

batch = client.batch.new()
batch.tables.create("new_Product", {"new_Price": "decimal", "new_InStock": "bool"})
batch.tables.add_columns("new_Product", {"new_Rating": "int"})
batch.tables.get("new_Product")
batch.query.sql("SELECT TOP 5 name FROM account")

result = batch.execute()

在错误情况下继续

即使一个操作失败,也尝试所有操作。

result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")
for item in result.failed:
    print(f"[ERR] {item.status_code}: {item.error_message}")

DataFrame 集成

将 pandas DataFrames 直接作为批处理输入。

import pandas as pd

batch = client.batch.new()

# Create records from a DataFrame
df = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
batch.dataframe.create("account", df)

# Update records from a DataFrame
updates = pd.DataFrame([
    {"accountid": id1, "telephone1": "555-0100"},
    {"accountid": id2, "telephone1": "555-0200"},
])
batch.dataframe.update("account", updates, id_column="accountid")

# Delete records from a Series
batch.dataframe.delete("account", pd.Series([id1, id2]))

result = batch.execute()

有关完整的批处理示例,请参阅 examples/advanced/batch.py

另请参阅