Tutorial: Coordinar transacciones entre tablas

Importante

Las transacciones que escriben en tablas delta administradas por el catálogo de Unity se encuentran en versión preliminar pública.

Las transacciones que escriben en las tablas Iceberg gestionadas por Unity Catalog se encuentran en versión preliminar privada. Para unirse a esta versión preliminar, envíe el formulario de inscripción de las tablas Iceberg administradas.

En este tutorial, usará ambos modos de transacción para coordinar las actualizaciones entre varias instrucciones y tablas en Azure Databricks: no interactiva (BEGIN ATOMIC), que se confirma automáticamente y interactiva (BEGIN TRANSACTION), lo que proporciona control explícito. En el tutorial también se muestra cómo usar transacciones con procedimientos almacenados y scripting de SQL.

Requisitos

  • Environment: Acceso a un área de trabajo de Azure Databricks.
  • Proceso: los tipos de proceso admitidos varían según el modo de transacción:
    • Un almacenamiento de SQL clásico o sin servidor admite ambos modos de transacción.
    • El proceso sin servidor solo admite transacciones no interactivas.
    • Los clústeres clásicos que ejecutan Databricks Runtime 18.0 o superior solo admiten transacciones no interactivas.
  • Privilegios: CREATE TABLE en un esquema de catálogo de Unity .

Configuración de tablas de ejemplo

Todas las tablas escritas en en una transacción de varias instrucciones y varias tablas deben:

Cree dos tablas de ejemplo en el Editor de SQL o en un cuaderno:

-- Account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

Nota:

Para habilitar transacciones en una tabla existente, ejecute:

ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

Inserte datos de ejemplo en ambas tablas:

INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Compruebe la configuración:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Salida:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Transacciones no interactivas

Las transacciones no interactivas usan BEGIN ATOMIC ... END; sintaxis. Todas las instrucciones se ejecutan como una sola unidad atómica. Si cada instrucción se realiza correctamente, Azure Databricks confirma automáticamente. Si se produce un error en alguna instrucción, Azure Databricks revierte todos los cambios automáticamente. Para obtener información detallada sobre la sintaxis y los patrones de uso, consulte transacciones no interactivas.

Realizar una transacción exitosa

Actualice ambas tablas de forma atómica:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Compruebe que el saldo de Alice ahora es 1100.00:

SELECT * FROM sample_accounts WHERE id = 1;

Compruebe que ahora existen dos registros de transacciones:

SELECT * FROM sample_transactions;

Tanto la actualización del saldo como el registro de transacciones se crearon juntos. Si se hubiera producido un error en cualquiera de las instrucciones, ningún cambio habría sido confirmado y Databricks habría terminado la transacción sin consecuencias.

Uso de SIGNAL para generar un error en una transacción bajo una condición

Puede usar SIGNAL dentro de un BEGIN ATOMIC ... END; bloque para producir un error en la transacción cuando no se cumple una condición definida por el usuario. En este ejemplo se inserta una cuenta con un saldo negativo y, a continuación, se usa SIGNAL para producir un error en la transacción si se produce un error en la comprobación del saldo:

BEGIN ATOMIC
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

SIGNAL genera un error, que hace que toda la transacción se revierte automáticamente. Esto devuelve cero filas porque la inserción se revirtió:

SELECT * FROM sample_accounts WHERE id = 3;

Consulte reversión automática en caso de error.

Ejecute una transacción donde la primera instrucción sea válida, pero la segunda hace referencia a una tabla que no existe:

BEGIN ATOMIC
  -- Valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);
  -- Invalid
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

Se produce un error en la transacción. Esto devuelve 0 filas porque se revirtió toda la transacción:

SELECT * FROM sample_accounts WHERE id = 4;

Aunque la primera INSERT instrucción era válida, se reviertó porque se produjo un error en la segunda instrucción. Esto muestra la garantía todo o nada de las transacciones.

Transacciones interactivas

Las transacciones interactivas proporcionan control explícito sobre cuándo confirmar o revertir. Use BEGIN TRANSACTION para iniciar y, a continuación, COMMIT para guardar los cambios o ROLLBACK para descartarlos.

Confirmar cambios

Inicie una transacción:

BEGIN TRANSACTION;

Realizar cambios (aún no confirmados):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Comprometerse a hacer permanentes los cambios:

COMMIT;

Compruebe que la cuenta de Eve ya está visible:

SELECT * FROM sample_accounts WHERE id = 5;

Compruebe que el saldo de Bob ahora es 550.00:

SELECT * FROM sample_accounts WHERE id = 2;

Revertir los cambios

Inicie una nueva transacción:

BEGIN TRANSACTION;

Realice un cambio:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Compruebe que el cambio está visible en la sesión (la fila no es visible para otras sesiones hasta que se confirme):

SELECT * FROM sample_accounts WHERE id = 6;

Revierte para descartar el cambio:

ROLLBACK;

Esto devuelve cero filas porque la inserción se revirtió:

SELECT * FROM sample_accounts WHERE id = 6;

Uso con procedimientos almacenados y scripts de SQL

Puede combinar transacciones con procedimientos almacenados para crear lógica de transacción reutilizable. Este patrón es útil para operaciones complejas que se ejecutan con frecuencia.

  1. Creación de tablas con confirmaciones de catálogo habilitadas

    CREATE SCHEMA IF NOT EXISTS main.retail;
    
    CREATE TABLE IF NOT EXISTS main.retail.orders (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.orders_staging (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2),
      batch_id STRING
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.total_sales (
      customer_id STRING,
      total_amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
  2. Definición del procedimiento almacenado

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Definición de la transacción

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Si se produce un error en alguna parte de la transacción, Databricks revierte todos los cambios automáticamente.

Limpiar

Quite las tablas de ejemplo:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;

DROP TABLE IF EXISTS main.retail.orders;
DROP TABLE IF EXISTS main.retail.orders_staging;
DROP TABLE IF EXISTS main.retail.total_sales;

Recursos adicionales