本文介绍如何使用mssql-django后端从 Django 应用程序创建和调用SQL Server存储过程。
在迁移中创建存储过程
使用 migrations.RunSQL 在 Django 迁移工作流中创建存储过程:
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("myapp", "0001_initial"),
]
operations = [
migrations.RunSQL(
sql="""
CREATE PROCEDURE GetProductsByCategory
@CategoryName NVARCHAR(100)
AS
BEGIN
SET NOCOUNT ON;
SELECT id, name, price
FROM myapp_product
WHERE category = @CategoryName;
END;
""",
reverse_sql="DROP PROCEDURE IF EXISTS GetProductsByCategory;",
),
]
执行迁移:
python manage.py migrate
调用存储过程
使用 Django 的 connection.cursor() 通过原始 SQL 调用存储过程:
from django.db import connection
def get_products_by_category(category_name):
with connection.cursor() as cursor:
cursor.execute("EXECUTE GetProductsByCategory @CategoryName = %s;", [category_name])
results = cursor.fetchall()
return results
调用具有多个参数的存储过程
将多个参数作为列表传递:
from django.db import connection
def search_products(category_name, min_price):
with connection.cursor() as cursor:
cursor.execute(
"EXECUTE SearchProducts @CategoryName = %s, @MinPrice = %s;",
[category_name, min_price],
)
results = cursor.fetchall()
return results
从结果中读取列名称
从 cursor.description 访问列名称:
from django.db import connection
def get_products_as_dicts(category_name):
with connection.cursor() as cursor:
cursor.execute("EXECUTE GetProductsByCategory @CategoryName = %s;", [category_name])
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
处理多个结果集
某些存储过程返回多个结果集。 使用 cursor.nextset() 在它们之间导航:
from django.db import connection
def get_dashboard_data():
with connection.cursor() as cursor:
cursor.execute("EXECUTE GetDashboardData;")
# First result set
products = cursor.fetchall()
# Move to second result set
cursor.nextset()
categories = cursor.fetchall()
return products, categories
封装到 Django 管理器中
在自定义管理器中封装存储过程调用,使代码更简洁:
from django.db import connection, models
class ProductManager(models.Manager):
def by_category(self, category_name):
with connection.cursor() as cursor:
cursor.execute(
"EXECUTE GetProductsByCategory @CategoryName = %s;",
[category_name],
)
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
category = models.CharField(max_length=100)
objects = ProductManager()
用法:
products = Product.objects.by_category("Electronics")