Uso de flujos en canalizaciones declarativas de Spark de Lakeflow

Los flujos de las canalizaciones declarativas de Spark en Lakeflow envían datos a una tabla de transmisión o a una vista materializada. En los siguientes ejemplos se muestra cómo definir flujos por defecto, definir un flujo por separado de su destino, escribir en una tabla de streaming desde varios tópicos de Kafka, ejecutar un relleno retrospectivo puntual y reemplazar las consultas UNION por el procesamiento de flujo con anexión.

Para obtener una visión general de los flujos, consulte Carga y procesamiento incrementales de datos con flujos de Lakeflow Spark Declarative Pipelines.

Ejemplo: Creación de un flujo predeterminado

Al crear una canalización, normalmente se define una tabla o vista junto con la consulta que lo admite. Por ejemplo, esta consulta crea una tabla de streaming denominada customers_silver leyendo desde customers_bronze. La tabla de streaming y su flujo predeterminado se crean juntos en un solo paso.

SQL

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Pitón

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

El flujo predeterminado de una tabla de streaming es un flujo de anexión que agrega nuevas filas con cada actualización y tiene el mismo nombre que el destino. Esta es la manera más común de usar canalizaciones( crear un flujo y su destino en un solo paso) y puede usarlo para ingerir o transformar datos. Para obtener más información sobre los conceptos relacionados con los flujos, consulte Cargue y procese datos de forma incremental con flujos de Lakeflow Spark Declarative Pipelines.

Ejemplo: Definir un flujo por separado de su destino

También puede crear un flujo para una tabla que haya definido por separado. El resultado es idéntico a la creación de un flujo predeterminado, incluido el mismo nombre para la tabla de streaming y el flujo:

Pitón

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

La definición de un flujo por separado de su destino permite crear varios flujos que anexan datos al mismo destino. Use el @dp.append_flow decorador en la interfaz Python o la CREATE FLOW...INSERT INTO cláusula de la interfaz SQL para agregar flujos para tareas como las siguientes:

Para las consultas de Python, use la función create_streaming_table() para crear una tabla de destino.

Important

  • Si necesita definir restricciones de calidad de datos con expectativas, defina las expectativas en la tabla de destino como parte de la create_streaming_table() función o en una definición de tabla existente. No se pueden definir expectativas en la @append_flow definición.
  • Los flujos se identifican mediante un nombre de flujo y este nombre se usa para identificar los puntos de control de streaming. El uso del nombre del flujo para identificar el punto de control significa lo siguiente:
    • Si se cambia el nombre de un flujo existente en una canalización, el punto de control no se transfiere y el flujo renombrado se considera efectivamente un flujo completamente nuevo.
    • No se puede reutilizar un nombre de flujo en una canalización, ya que el punto de control existente no coincidirá con la nueva definición de flujo.

Ejemplo: Escribir en una tabla de flujo continuo desde varios tópicos de Kafka

En los ejemplos siguientes se crea una tabla de streaming denominada kafka_target y se escribe en esa tabla de streaming desde dos tópicos de Kafka.

Pitón

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para obtener más información sobre la read_kafka() función con valores de tabla usada en las consultas SQL, consulte read_kafka en la referencia del lenguaje SQL.

En Python, puede crear mediante programación varios flujos que tienen como destino una sola tabla. En el ejemplo siguiente se muestra este patrón para obtener una lista de temas de Kafka.

Nota:

Este patrón tiene los mismos requisitos que el uso de un for bucle para crear tablas. Debe pasar explícitamente un valor de Python a la función que define el flujo. Consulte Creación de tablas en un for bucle.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Ejemplo: Ejecutar un relleno de datos único

Si desea ejecutar una consulta para anexar datos a una tabla de streaming existente, use append_flow.

Después de anexar un conjunto de datos existentes, tiene varias opciones:

  • Si desea que la consulta anexe nuevos datos si llegan al directorio de relleno, mantenga la consulta en su lugar.
  • Si quiere que sea una reposición única y nunca vuelva a ejecutarse, elimine la consulta después de ejecutar la canalización una vez.
  • Si desea que la consulta se ejecute una vez y solo se ejecute de nuevo en los casos en los que los datos se actualizan por completo, establezca el once parámetro True en el flujo de anexar. En SQL, use INSERT INTO ONCE.

En los ejemplos siguientes se ejecuta una consulta para anexar datos históricos a una tabla de streaming:

Pitón

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.read
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Para obtener un ejemplo más detallado, consulte Reposición de datos históricos con canalizaciones.

Ejemplo: Uso del procesamiento de flujo de anexión en lugar de UNION

En lugar de usar una consulta con una UNION cláusula, puede utilizar consultas de flujo de apéndice para combinar varios orígenes y escribir en una sola tabla de transmisión. El uso de consultas de flujo de anexión en lugar de UNION permite anexar a una tabla de streaming desde varios orígenes sin ejecutar una actualización completa.

En el ejemplo de Python siguiente se incluye una consulta que combina varios orígenes de datos con una UNION cláusula :

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

En los siguientes ejemplos, se reemplaza la consulta UNION por consultas de flujo de datos agregadas.

Pitón

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );

Ejemplo: Uso transformWithState para supervisar los latidos del sensor

En el ejemplo siguiente se muestra un procesador con estado que lee de Kafka y comprueba que los sensores emiten latidos periódicamente. Si no se recibe un latido en un plazo de 5 minutos, el procesador emite una entrada a la tabla Delta de destino para su análisis.

Para obtener más información sobre cómo crear aplicaciones con estado personalizadas, consulte Compilación de una aplicación con estado personalizado.

Nota:

RocksDB es el proveedor de estado predeterminado a partir de Databricks Runtime 17.2. Si se produce un error en la consulta debido a una excepción de proveedor no compatible, agregue las siguientes configuraciones de canalización, realice una actualización completa o un restablecimiento de punto de control y vuelva a ejecutar la canalización:

"configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator

import pandas as pd

from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

KAFKA_TOPIC = "<your-kafka-topic>"

output_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("last_heartbeat_time", TimestampType(), False)])

class SensorHeartbeatProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define state schema to store sensor information (sensor_id is the grouping key)
        state_schema = StructType([
            StructField("sensor_type", StringType(), False),
            StructField("last_heartbeat_time", TimestampType(), False)])
        self.sensor_state = handle.getValueState("sensorState", state_schema)
        # State variable to track the previously registered timer
        timer_schema = StructType([StructField("timer_ts", LongType(), False)])
        self.timer_state = handle.getValueState("timerState", timer_schema)
        self.handle = handle

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Process one row from input and update state
        pdf = next(rows)
        row = pdf.iloc[0]
        # Store or update the sensor information in state using current timestamp
        current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
        self.sensor_state.update((
            row["sensor_type"],
            current_time
        ))

        # Delete old timer if already registered
        if self.timer_state.exists():
            old_timer = self.timer_state.get()[0]
            self.handle.deleteTimer(old_timer)

        # Register a timer for 5 minutes from current processing time
        expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
        self.handle.registerTimer(expiry_time)
        # Store the new timer timestamp in state
        self.timer_state.update((expiry_time,))

        # No output on input processing, output only on timer expiry
        return iter([])

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        # Emit output row based on state store
        if self.sensor_state.exists():
            state = self.sensor_state.get()
            output = pd.DataFrame({
                "sensor_id": [key[0]],  # Use grouping key as sensor_id
                "sensor_type": [state[0]],
                "last_heartbeat_time": [state[1]]
            })
            # Remove the entry for the sensor from the state store
            self.sensor_state.clear()
            # Remove the timer state entry
            self.timer_state.clear()
            yield output

    def close(self) -> None:
        pass

dp.create_streaming_table("sensorAlerts")

# Define the schema for the Kafka message value
sensor_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("sensor_value", LongType(), False)])

@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
    return (
      spark.readStream
        .format("kafka")
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "earliest")
        .load()
        .select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
        .select("data.*", "timestamp")
        .withWatermark('timestamp', '1 hour')
        .groupBy(col("sensor_id"))
        .transformWithStateInPandas(
          statefulProcessor = SensorHeartbeatProcessor(),
          outputStructType = output_schema,
          outputMode = 'update',
          timeMode = 'ProcessingTime'))