mssql-django 的性能优化

本文提供了在使用 mssql-django 后端和 SQL Server 时优化 Django 应用程序性能的指导。

连接优化

通过调整连接池、持久连接和超时设置,减少连接开销。

启用连接池

默认情况下,连接池处于启用状态。 确认它未在您的 settings.py 中被禁用:

# Keep this True (or omit it entirely) for best connection performance
DATABASE_CONNECTION_POOLING = True

使用CONN_MAX_AGE

设置为 CONN_MAX_AGE 使数据库连接跨请求保持打开状态,避免为每个请求建立新连接开销:

DATABASES = {
    "default": {
        "ENGINE": "mssql",
        "NAME": "<your-database>",
        "USER": "<your-username>",
        "PASSWORD": "<your-password>",
        "HOST": "<your-server>",
        "PORT": "1433",
        "CONN_MAX_AGE": 600,  # Keep connections open for 10 minutes
        "OPTIONS": {
            "driver": "ODBC Driver 18 for SQL Server",
        },
    },
}

设置查询超时时间

防止长时间运行的查询无限期地消耗资源:

DATABASES = {
    "default": {
        "ENGINE": "mssql",
        "NAME": "<your-database>",
        "USER": "<your-username>",
        "PASSWORD": "<your-password>",
        "HOST": "<your-server>",
        "PORT": "1433",
        "OPTIONS": {
            "driver": "ODBC Driver 18 for SQL Server",
            "query_timeout": 30,
        },
    },
}

查询优化

使用这些 ORM 技术减少数据库往返次数和查询计数。

避免 N+1 查询模式

对外键关系使用 select_related(单个 JOIN 查询),对多对多或反向关系使用 prefetch_related(使用 IN 子句的单独查询):

# Bad: N+1 queries
orders = Order.objects.all()
for order in orders:
    print(order.customer.name)  # Each access triggers a query

# Good: Single JOIN query
orders = Order.objects.select_related("customer").all()
for order in orders:
    print(order.customer.name)  # No additional queries

# Good: Two queries instead of N+1
orders = Order.objects.prefetch_related("items").all()
for order in orders:
    for item in order.items.all():  # Uses prefetched data
        print(item.name)

仅使用()和延迟()

在不需要所有字段时限制检索的列:

# Retrieve only specific fields
products = Product.objects.only("name", "price").all()

# Defer loading of large fields
products = Product.objects.defer("description", "metadata").all()

使用 values() 和 values_list()

如果不需要模型实例,请使用 values()values_list() 用于较轻的查询:

# Returns dictionaries instead of model instances
prices = Product.objects.values("name", "price")

# Returns tuples
names = Product.objects.values_list("name", flat=True)

在 2,100 个参数限制内工作

SQL Server将每个查询限制为 2,100 个参数。 Django 生成参数化查询,因此生成大型 IN 子句或批量值列表的操作可以达到此限制。

大型 IN 子句的自动优化

filter(field__in=list)当调用具有超过 2,048 个值时,mssql-django后端会自动将值插入临时表中(批为 1,000 个),并将查询重写为WHERE field IN (SELECT params FROM #Temp_params)。 此优化可避免参数限制,而无需更改任何代码。 它适用于所有 __in 查找,包括由 prefetch_related() 生成的查找。 后端将设置 max_in_list_size() 2,048 阈值,以安全地保持在SQL Server的 2,100 个参数限制之下。

此重写具有成本:创建和填充 #Temp_params 增加了额外的往返和 tempdb 活动。 对于接近阈值的列表,请对工作负荷中的这两种方法进行基准测试。

当仍需要手动干预时

自动临时表优化可以处理 __in 查找,但这些操作仍可能触及 2,100 个参数的上限,因为每个字段值都会作为单独的参数:

  • bulk_create()bulk_update() 具有多个对象和多个字段
  • 具有许多链接条件的复杂 Q() 表达式
  • 想要避免为填充 #Temp_params 而进行往返请求的情况(例如,当使用较小的列表和普通的 IN (...) 会更快时)

解决方法

  1. 在批量操作中使用 batch_size,将每批控制在限制以内:

    # Backend cap with 10 fields: min(1000, 2050 // 10 // 2) = 102 rows per batch
    # The backend applies the conservative // 2 divisor for both bulk_create and bulk_update.
    Product.objects.bulk_create(products, batch_size=100)
    
  2. 如果您想绕过自动临时表机制,请对大型IN查询进行分块处理

    from itertools import islice
    
    def chunked_filter(queryset, field, values, chunk_size=2000):
        """Filter a queryset in chunks to stay within the 2,100 parameter limit."""
        results = []
        it = iter(values)
        while chunk := list(islice(it, chunk_size)):
            results.extend(queryset.filter(**{f"{field}__in": chunk}))
        return results
    
    # Returns a list of model instances, not a QuerySet
    products = chunked_filter(Product.objects, "pk", large_id_list)
    
  3. 使用子查询 而不是具体化 ID 列表:

    # Instead of: Order.objects.filter(product_id__in=list(Product.objects.values_list("id", flat=True)))
    # Use a subquery (Django generates a single SQL statement with no parameter explosion)
    Order.objects.filter(product__in=Product.objects.filter(active=True))
    
  4. Prefetch 与筛选后的查询集一起使用,以限制传递给 prefetch_related() 的 ID 数量:

    from django.db.models import Prefetch
    
    orders = Order.objects.prefetch_related(
        Prefetch("items", queryset=OrderItem.objects.select_related("product"))
    )[:500]  # Limit parent queryset size
    

批量操作

使用批量操作减少数据库往返次数:

from decimal import Decimal

from myapp.models import Product

# Bulk create
new_products = [Product(name=f"Item {i}", price=Decimal("1.99") * i) for i in range(1000)]
Product.objects.bulk_create(new_products, batch_size=500)

# Bulk update: refetch so each instance has a primary key
products = list(Product.objects.filter(name__startswith="Item "))
for product in products:
    product.price *= Decimal("1.10")
Product.objects.bulk_update(products, ["price"], batch_size=500)

Important

使用 bulk_createbulk_update 时,请根据每个对象的字段数量设置 batch_size。 后端的 bulk_batch_size() 将每个批次限制为 1,000 行,并对 两者bulk_createbulk_update 采用较为保守的 2050 / (fields * 2) 参数限制。 额外的 / 2 是为 bulk_update 每个字段使用的两个参数预留的(一个用于 CASE 匹配,另一个用于值),并且对 bulk_create 也应用了相同的除数,因此同一代码路径对这两种操作都是安全的。

如果省略 batch_size,后端会自动计算安全值。 还可以指定 batch_size,后端会进一步将其限制在安全上限内。

有关 return_rows_bulk_insertdefault 参数的更多信息,请参阅 mssql-django 中的批量操作

索引策略

Django 会自动为 ForeignKeyOneToOneField 以及带有 db_index=True 的字段创建索引。 对于其他索引,请使用 Meta.indexes

from django.db import models

class Product(models.Model):
    name = models.CharField(max_length=100, db_index=True)
    category = models.CharField(max_length=50)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        indexes = [
            models.Index(fields=["category", "price"]),
            models.Index(fields=["-created_at"]),
        ]

对于 SQL Server 特有的索引(例如带有 INCLUDE 列的索引),请在迁移中使用原生 SQL:

from django.db import migrations

class Migration(migrations.Migration):
    dependencies = [("myapp", "0001_initial")]
    operations = [
        migrations.RunSQL(
            sql="CREATE INDEX IX_product_category ON myapp_product (category) INCLUDE (name, price);",
            reverse_sql="DROP INDEX IX_product_category ON myapp_product;",
        ),
    ]

后端 mssql-django 支持覆盖索引(supports_covering_indexes = True in mssql/features.py)。 在 (3.2 及更高版本)支持 mssql-django 的所有 Django 版本上,可以使用 include 参数而不是 models.Index 原始 SQL:

class Product(models.Model):
    name = models.CharField(max_length=100)
    category = models.CharField(max_length=50)
    price = models.DecimalField(max_digits=10, decimal_places=2)

    class Meta:
        indexes = [
            models.Index(fields=["category"], include=["name", "price"], name="ix_product_cat_cover"),
        ]

文件组位置

后端 mssql-django 将 Django 的 db_tablespace 映射到 SQL Server 的 ON filegroup 子句。 使用此方法可将表或索引放置在特定文件组上:

class LargeAuditLog(models.Model):
    timestamp = models.DateTimeField(auto_now_add=True)
    message = models.TextField()

    class Meta:
        db_tablespace = "ARCHIVE_FG"

这将生成: CREATE TABLE ... ON [ARCHIVE_FG].

Important

在运行 migrate 之前,文件组必须已存在于 SQL Server 数据库中。 使用 ALTER DATABASE [<your-database>] ADD FILEGROUP [ARCHIVE_FG] 创建它,并向其中添加至少一个文件。

窗口函数

后端支持SQL Server的窗口函数(supports_over_clause = True)。 使用 Django 的 Window 表达式进行排名、运行总计和分区计算:

from django.db.models import F, Window
from django.db.models.functions import Rank, RowNumber

# Rank products by price within each category
products = Product.objects.annotate(
    price_rank=Window(
        expression=Rank(),
        partition_by=F("category"),
        order_by=F("price").desc(),
    )
)

# Row numbers across the full result set
products = Product.objects.annotate(
    row_num=Window(
        expression=RowNumber(),
        order_by=F("created_at").asc(),
    )
)

Note

SQL Server不支持 NTH_VALUE()。 请改用 FIRST_VALUELAST_VALUE 或子查询变通方案。 请参阅 mssql-django 中的限制和不支持的功能

监视查询性能

使用 Django 的内置查询日志记录功能,在开发过程中识别慢查询:

LOGGING = {
    "version": 1,
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
        },
    },
    "loggers": {
        "django.db.backends": {
            "level": "DEBUG",
            "handlers": ["console"],
        },
    },
}

对于过渡工作负荷和生产工作负荷,请使用SQL Server性能工具分析 Django 生成的 SQL:

  1. 在直接查询 DMV 之前,先从内置性能报告开始。

    这些报告通常是查找成本高昂的查询、等待、阻塞和资源压力的最快方法,与临时 DMV 查询相比,错误空间更少。

  2. 使用查询存储识别最耗费资源的查询以及最近出现回归的查询。

  3. 使用SQL Server Management Studio中的排名靠前的资源消耗查询回归查询查询等待统计信息视图来确定瓶颈是 CPU、I/O、内存还是等待。 有关指南,请参阅使用 查询存储 监视工作负荷的最佳做法

  4. 打开该慢速语句的实际执行计划,查找扫描、昂贵的键查找、不准确的行估计和缺少的索引。

  5. 如果在部署或架构更改后查询变慢,请在更改应用程序代码之前,在查询存储中比较其计划。 在你修复底层索引、统计信息或查询结构问题期间,DBA 可以暂时强制使用一个已知有效的执行计划。

如果 查询存储 显示的是等待而不是高 CPU 时间,请使用识别瓶颈来区分 CPU、内存、磁盘 I/O、连接压力和阻塞问题。