本文提供了在使用 mssql-django 后端和 SQL Server 时优化 Django 应用程序性能的指导。
连接优化
通过调整连接池、持久连接和超时设置,减少连接开销。
启用连接池
默认情况下,连接池处于启用状态。 确认它未在您的 settings.py 中被禁用:
# Keep this True (or omit it entirely) for best connection performance
DATABASE_CONNECTION_POOLING = True
使用CONN_MAX_AGE
设置为 CONN_MAX_AGE 使数据库连接跨请求保持打开状态,避免为每个请求建立新连接开销:
DATABASES = {
"default": {
"ENGINE": "mssql",
"NAME": "<your-database>",
"USER": "<your-username>",
"PASSWORD": "<your-password>",
"HOST": "<your-server>",
"PORT": "1433",
"CONN_MAX_AGE": 600, # Keep connections open for 10 minutes
"OPTIONS": {
"driver": "ODBC Driver 18 for SQL Server",
},
},
}
设置查询超时时间
防止长时间运行的查询无限期地消耗资源:
DATABASES = {
"default": {
"ENGINE": "mssql",
"NAME": "<your-database>",
"USER": "<your-username>",
"PASSWORD": "<your-password>",
"HOST": "<your-server>",
"PORT": "1433",
"OPTIONS": {
"driver": "ODBC Driver 18 for SQL Server",
"query_timeout": 30,
},
},
}
查询优化
使用这些 ORM 技术减少数据库往返次数和查询计数。
避免 N+1 查询模式
对外键关系使用 select_related(单个 JOIN 查询),对多对多或反向关系使用 prefetch_related(使用 IN 子句的单独查询):
# Bad: N+1 queries
orders = Order.objects.all()
for order in orders:
print(order.customer.name) # Each access triggers a query
# Good: Single JOIN query
orders = Order.objects.select_related("customer").all()
for order in orders:
print(order.customer.name) # No additional queries
# Good: Two queries instead of N+1
orders = Order.objects.prefetch_related("items").all()
for order in orders:
for item in order.items.all(): # Uses prefetched data
print(item.name)
仅使用()和延迟()
在不需要所有字段时限制检索的列:
# Retrieve only specific fields
products = Product.objects.only("name", "price").all()
# Defer loading of large fields
products = Product.objects.defer("description", "metadata").all()
使用 values() 和 values_list()
如果不需要模型实例,请使用 values() 或 values_list() 用于较轻的查询:
# Returns dictionaries instead of model instances
prices = Product.objects.values("name", "price")
# Returns tuples
names = Product.objects.values_list("name", flat=True)
在 2,100 个参数限制内工作
SQL Server将每个查询限制为 2,100 个参数。 Django 生成参数化查询,因此生成大型 IN 子句或批量值列表的操作可以达到此限制。
大型 IN 子句的自动优化:
filter(field__in=list)当调用具有超过 2,048 个值时,mssql-django后端会自动将值插入临时表中(批为 1,000 个),并将查询重写为WHERE field IN (SELECT params FROM #Temp_params)。 此优化可避免参数限制,而无需更改任何代码。 它适用于所有 __in 查找,包括由 prefetch_related() 生成的查找。 后端将设置 max_in_list_size() 2,048 阈值,以安全地保持在SQL Server的 2,100 个参数限制之下。
此重写具有成本:创建和填充 #Temp_params 增加了额外的往返和 tempdb 活动。 对于接近阈值的列表,请对工作负荷中的这两种方法进行基准测试。
当仍需要手动干预时:
自动临时表优化可以处理 __in 查找,但这些操作仍可能触及 2,100 个参数的上限,因为每个字段值都会作为单独的参数:
-
bulk_create()或bulk_update()具有多个对象和多个字段 - 具有许多链接条件的复杂
Q()表达式 - 想要避免为填充
#Temp_params而进行往返请求的情况(例如,当使用较小的列表和普通的IN (...)会更快时)
解决方法:
在批量操作中使用
batch_size,将每批控制在限制以内:# Backend cap with 10 fields: min(1000, 2050 // 10 // 2) = 102 rows per batch # The backend applies the conservative // 2 divisor for both bulk_create and bulk_update. Product.objects.bulk_create(products, batch_size=100)如果您想绕过自动临时表机制,请对大型
IN查询进行分块处理:from itertools import islice def chunked_filter(queryset, field, values, chunk_size=2000): """Filter a queryset in chunks to stay within the 2,100 parameter limit.""" results = [] it = iter(values) while chunk := list(islice(it, chunk_size)): results.extend(queryset.filter(**{f"{field}__in": chunk})) return results # Returns a list of model instances, not a QuerySet products = chunked_filter(Product.objects, "pk", large_id_list)使用子查询 而不是具体化 ID 列表:
# Instead of: Order.objects.filter(product_id__in=list(Product.objects.values_list("id", flat=True))) # Use a subquery (Django generates a single SQL statement with no parameter explosion) Order.objects.filter(product__in=Product.objects.filter(active=True))将
Prefetch与筛选后的查询集一起使用,以限制传递给prefetch_related()的 ID 数量:from django.db.models import Prefetch orders = Order.objects.prefetch_related( Prefetch("items", queryset=OrderItem.objects.select_related("product")) )[:500] # Limit parent queryset size
批量操作
使用批量操作减少数据库往返次数:
from decimal import Decimal
from myapp.models import Product
# Bulk create
new_products = [Product(name=f"Item {i}", price=Decimal("1.99") * i) for i in range(1000)]
Product.objects.bulk_create(new_products, batch_size=500)
# Bulk update: refetch so each instance has a primary key
products = list(Product.objects.filter(name__startswith="Item "))
for product in products:
product.price *= Decimal("1.10")
Product.objects.bulk_update(products, ["price"], batch_size=500)
Important
使用 bulk_create 或 bulk_update 时,请根据每个对象的字段数量设置 batch_size。 后端的 bulk_batch_size() 将每个批次限制为 1,000 行,并对 两者bulk_create 和 bulk_update 采用较为保守的 2050 / (fields * 2) 参数限制。 额外的 / 2 是为 bulk_update 每个字段使用的两个参数预留的(一个用于 CASE 匹配,另一个用于值),并且对 bulk_create 也应用了相同的除数,因此同一代码路径对这两种操作都是安全的。
如果省略 batch_size,后端会自动计算安全值。 还可以指定 batch_size,后端会进一步将其限制在安全上限内。
有关 return_rows_bulk_insert 和 default 参数的更多信息,请参阅 mssql-django 中的批量操作。
索引策略
Django 会自动为 ForeignKey、OneToOneField 以及带有 db_index=True 的字段创建索引。 对于其他索引,请使用 Meta.indexes:
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=100, db_index=True)
category = models.CharField(max_length=50)
price = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=["category", "price"]),
models.Index(fields=["-created_at"]),
]
对于 SQL Server 特有的索引(例如带有 INCLUDE 列的索引),请在迁移中使用原生 SQL:
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [("myapp", "0001_initial")]
operations = [
migrations.RunSQL(
sql="CREATE INDEX IX_product_category ON myapp_product (category) INCLUDE (name, price);",
reverse_sql="DROP INDEX IX_product_category ON myapp_product;",
),
]
后端 mssql-django 支持覆盖索引(supports_covering_indexes = True in mssql/features.py)。 在 (3.2 及更高版本)支持 mssql-django 的所有 Django 版本上,可以使用 include 参数而不是 models.Index 原始 SQL:
class Product(models.Model):
name = models.CharField(max_length=100)
category = models.CharField(max_length=50)
price = models.DecimalField(max_digits=10, decimal_places=2)
class Meta:
indexes = [
models.Index(fields=["category"], include=["name", "price"], name="ix_product_cat_cover"),
]
文件组位置
后端 mssql-django 将 Django 的 db_tablespace 映射到 SQL Server 的 ON filegroup 子句。 使用此方法可将表或索引放置在特定文件组上:
class LargeAuditLog(models.Model):
timestamp = models.DateTimeField(auto_now_add=True)
message = models.TextField()
class Meta:
db_tablespace = "ARCHIVE_FG"
这将生成: CREATE TABLE ... ON [ARCHIVE_FG].
Important
在运行 migrate 之前,文件组必须已存在于 SQL Server 数据库中。 使用 ALTER DATABASE [<your-database>] ADD FILEGROUP [ARCHIVE_FG] 创建它,并向其中添加至少一个文件。
窗口函数
后端支持SQL Server的窗口函数(supports_over_clause = True)。 使用 Django 的 Window 表达式进行排名、运行总计和分区计算:
from django.db.models import F, Window
from django.db.models.functions import Rank, RowNumber
# Rank products by price within each category
products = Product.objects.annotate(
price_rank=Window(
expression=Rank(),
partition_by=F("category"),
order_by=F("price").desc(),
)
)
# Row numbers across the full result set
products = Product.objects.annotate(
row_num=Window(
expression=RowNumber(),
order_by=F("created_at").asc(),
)
)
Note
SQL Server不支持 NTH_VALUE()。 请改用 FIRST_VALUE、LAST_VALUE 或子查询变通方案。 请参阅 mssql-django 中的限制和不支持的功能。
监视查询性能
使用 Django 的内置查询日志记录功能,在开发过程中识别慢查询:
LOGGING = {
"version": 1,
"handlers": {
"console": {
"class": "logging.StreamHandler",
},
},
"loggers": {
"django.db.backends": {
"level": "DEBUG",
"handlers": ["console"],
},
},
}
对于过渡工作负荷和生产工作负荷,请使用SQL Server性能工具分析 Django 生成的 SQL:
在直接查询 DMV 之前,先从内置性能报告开始。
- 对于SQL Server和Azure SQL 托管实例,请使用SQL Server Management Studio中的性能仪表板。
- 对于Azure SQL 数据库,请使用 query Performance Insight for Azure SQL 数据库。
这些报告通常是查找成本高昂的查询、等待、阻塞和资源压力的最快方法,与临时 DMV 查询相比,错误空间更少。
使用查询存储识别最耗费资源的查询以及最近出现回归的查询。
使用SQL Server Management Studio中的排名靠前的资源消耗查询、回归查询和查询等待统计信息视图来确定瓶颈是 CPU、I/O、内存还是等待。 有关指南,请参阅使用 查询存储 监视工作负荷的最佳做法。
打开该慢速语句的实际执行计划,查找扫描、昂贵的键查找、不准确的行估计和缺少的索引。
如果在部署或架构更改后查询变慢,请在更改应用程序代码之前,在查询存储中比较其计划。 在你修复底层索引、统计信息或查询结构问题期间,DBA 可以暂时强制使用一个已知有效的执行计划。
如果 查询存储 显示的是等待而不是高 CPU 时间,请使用识别瓶颈来区分 CPU、内存、磁盘 I/O、连接压力和阻塞问题。