本文演示了使用 SDK 处理 Dataverse 数据和元数据的示例代码。 在继续之前,请务必阅读 入门。
基本操作
下面是一些操作帐户表的示例代码。
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Replace <myorg> with the name of a valid environment.
base_url = "https://<myorg>.crm.dynamics.com"
client = DataverseClient(base_url=base_url, credential=InteractiveBrowserCredential())
# Create a record
account_id = client.records.create("account", {"name": "Contoso Ltd"})
# Read a record
account = client.records.retrieve("account", account_id)
print(account["name"])
# Read with expand fetches a related record in the same HTTP request
account = client.records.retrieve(
"account", account_id,
select=["name"],
expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))
# Update a record
client.records.update("account", account_id, {"telephone1": "555-0199"})
# Delete a record
client.records.delete("account", account_id)
上下文管理器
上下文管理器负责自动清理和 HTTP 连接池管理。 使用以下语法来利用上下文管理器。
with DataverseClient("https://<myorg>.crm.dynamics.com", credential) as client:
以下工作代码演示如何使用上下文管理器。
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Connect to Dataverse
credential = InteractiveBrowserCredential()
with DataverseClient("https://<myorg>.crm.dynamics.com", credential) as client:
# Create a contact
contact_id = client.records.create("contact", {"firstname": "John", "lastname": "Doe"})
# Read the contact back
contact = client.records.retrieve("contact", contact_id, select=["firstname", "lastname"])
print(f"Created: {contact['firstname']} {contact['lastname']}")
# Clean up
client.records.delete("contact", contact_id)
# Session closed, caches cleared automatically
批量操作
下面是执行批量操作的几个示例。
# Bulk create
payloads = [
{"name": "Company A"},
{"name": "Company B"},
{"name": "Company C"}
]
ids = client.records.create("account", payloads)
# Bulk update (broadcast same change to all)
client.records.update("account", ids, {"industry": "Technology"})
# Bulk delete
client.records.delete("account", ids, use_bulk_delete=True)
以下示例创建多个帐户。 传递有效负载列表到 create(logical_name, payloads) 以调用集合绑定的 Microsoft.Dynamics.CRM.CreateMultiple 操作。 该方法返回 list[str] 已创建的记录 ID。
# Bulk create accounts (returns list of GUIDs)
payloads = [
{"name": "Contoso"},
{"name": "Fabrikam"},
{"name": "Northwind"},
]
ids = client.records.create("account", payloads)
assert isinstance(ids, list) and all(isinstance(x, str) for x in ids)
print({"created_ids": ids})
有关批量操作的详细信息:
- 返回
None(与单个更新相同)以保持语义一致。 - 广播与按记录处理取决于
changes参数是字典还是列表。 - 构造 UpdateMultiple 操作 目标时,会自动注入主键属性。
- 如果任何负载数据省略了 @odata.type,SDK 会自动补上它(通过缓存的逻辑名称查找)。
- 响应仅包含 ID - SDK 返回这些 GUID 字符串。
- 单记录创建返回包含单个元素的 GUID 列表。
- 每个实体集中的
@odata.type元数据查找被执行一次,结果缓存于内存中。
Upsert(创建和更新)
常见的数据访问序列是首先检查表行是否存在。 如果该行存在,请更新它。 否则,创建该行。 通过使用 Upsert 操作的单个 API 调用,可以提高此序列的效率。
有关详细信息,请参阅 使用 Upsert 创建或更新记录。
Important
该表必须在 Dataverse 中为 alternate_key 中使用的列配置了替代键。 通过 Power Apps maker 门户或 Dataverse API 调用来在表元数据中定义备用键。 如果没有配置的备用密钥,Dataverse 将拒绝出现 400 错误的 upsert 请求。
使用 client.records.upsert() 创建或更新由替代键标识的记录。 当键与现有记录匹配时,该方法将更新记录。 否则,它将创建该记录。 单个项使用 PATCH 请求,而多个项则使用 UpsertMultiple 批量操作。
from PowerPlatform.Dataverse.models.upsert import UpsertItem
# Upsert a single record
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001"},
record={"name": "Contoso Ltd", "telephone1": "555-0100"},
)
])
# Upsert multiple records (uses UpsertMultiple bulk action)
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001"},
record={"name": "Contoso Ltd"},
),
UpsertItem(
alternate_key={"accountnumber": "ACC-002"},
record={"name": "Fabrikam Inc"},
),
])
# Composite alternate key (multiple columns identify the record)
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
record={"name": "Contoso Ltd"},
)
])
# Plain dict syntax (no import needed)
client.records.upsert("account", [
{
"alternate_key": {"accountnumber": "ACC-001"},
"record": {"name": "Contoso Ltd"},
}
])
数据帧
SDK 通过 client.dataframe 命名空间为所有 CRUD 操作提供 pandas 封装。 这些整合程序使用 pandas 的 DataFrame 和 Series API 进行输入和输出。
注释
client.dataframe.get() 已弃用。 使用以下部分所示的 GA 模式。
import pandas as pd
from PowerPlatform.Dataverse.models.filters import col
# Query records as a single DataFrame (GA builder pattern)
df = (client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.execute()
.to_dataframe())
print(f"Found {len(df)} accounts")
# Limit results with top for large tables
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()
# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
{"name": "Contoso", "telephone1": "555-0100"},
{"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)
# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")
# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": new_accounts["accountid"].iloc[0], "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)
# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])
# SQL query directly to DataFrame (supports JOINs, aggregates, GROUP BY)
df = client.dataframe.sql(
"SELECT a.name, COUNT(c.contactid) as contacts "
"FROM account a "
"JOIN contact c ON a.accountid = c.parentcustomerid "
"GROUP BY a.name"
)
将文件上传到 Dataverse
以下示例演示如何将名为document.pdf的文件上传到账户记录中名为new_Document的文件列。 用于Python的 SDK 会自动处理大于 128 MB 的文件区块。
# Upload a file to a record
client.files.upload(
"account",
account_id,
"new_Document",
"/path/to/document.pdf",
)
Tip
如果文件列不存在,SDK 会自动创建它。
批处理操作
使用 client.batch 在一个 HTTP 请求中发送多个操作。 批处理命名空间映射到 client.records、client.tables 和 client.query。
# Build a batch request and add operations
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.create("account", [{"name": "Fabrikam"}, {"name": "Woodgrove"}])
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.delete("account", old_id)
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"]) # single record with expand
batch.records.list( # multi-record, single page
"account",
filter="statecode eq 0",
select=["name"],
orderby=["name asc"],
top=50,
)
result = batch.execute()
for item in result.responses:
if item.is_success:
print(f"[OK] {item.status_code} entity_id={item.entity_id}")
else:
print(f"[ERR] {item.status_code}: {item.error_message}")
事务性变更集
变更集中的所有操作要么全部成功,要么全部回滚。
batch = client.batch.new()
with batch.changeset() as cs:
lead_ref = cs.records.create("lead", {"firstname": "Ada"})
contact_ref = cs.records.create("contact", {"firstname": "Ada"})
cs.records.create("account", {
"name": "Babbage & Co.",
"[email protected]": lead_ref,
"[email protected]": contact_ref,
})
result = batch.execute()
print(f"Created {len(result.entity_ids)} records atomically")
批处理中的表元数据和 SQL 查询
batch = client.batch.new()
batch.tables.create("new_Product", {"new_Price": "decimal", "new_InStock": "bool"})
batch.tables.add_columns("new_Product", {"new_Rating": "int"})
batch.tables.get("new_Product")
batch.query.sql("SELECT TOP 5 name FROM account")
result = batch.execute()
在错误情况下继续
即使一个操作失败,也尝试所有操作。
result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")
for item in result.failed:
print(f"[ERR] {item.status_code}: {item.error_message}")
DataFrame 集成
将 pandas DataFrames 直接作为批处理输入。
import pandas as pd
batch = client.batch.new()
# Create records from a DataFrame
df = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
batch.dataframe.create("account", df)
# Update records from a DataFrame
updates = pd.DataFrame([
{"accountid": id1, "telephone1": "555-0100"},
{"accountid": id2, "telephone1": "555-0200"},
])
batch.dataframe.update("account", updates, id_column="accountid")
# Delete records from a Series
batch.dataframe.delete("account", pd.Series([id1, id2]))
result = batch.execute()
有关完整的批处理示例,请参阅 examples/advanced/batch.py。