Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Los flujos de las canalizaciones declarativas de Spark en Lakeflow envían datos a una tabla de transmisión o a una vista materializada. En los siguientes ejemplos se muestra cómo definir flujos por defecto, definir un flujo por separado de su destino, escribir en una tabla de streaming desde varios tópicos de Kafka, ejecutar un relleno retrospectivo puntual y reemplazar las consultas UNION por el procesamiento de flujo con anexión.
Para obtener una visión general de los flujos, consulte Carga y procesamiento incrementales de datos con flujos de Lakeflow Spark Declarative Pipelines.
Ejemplo: Creación de un flujo predeterminado
Al crear una canalización, normalmente se define una tabla o vista junto con la consulta que lo admite. Por ejemplo, esta consulta crea una tabla de streaming denominada customers_silver leyendo desde customers_bronze. La tabla de streaming y su flujo predeterminado se crean juntos en un solo paso.
SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Pitón
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
El flujo predeterminado de una tabla de streaming es un flujo de anexión que agrega nuevas filas con cada actualización y tiene el mismo nombre que el destino. Esta es la manera más común de usar canalizaciones( crear un flujo y su destino en un solo paso) y puede usarlo para ingerir o transformar datos. Para obtener más información sobre los conceptos relacionados con los flujos, consulte Cargue y procese datos de forma incremental con flujos de Lakeflow Spark Declarative Pipelines.
Ejemplo: Definir un flujo por separado de su destino
También puede crear un flujo para una tabla que haya definido por separado. El resultado es idéntico a la creación de un flujo predeterminado, incluido el mismo nombre para la tabla de streaming y el flujo:
Pitón
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
La definición de un flujo por separado de su destino permite crear varios flujos que anexan datos al mismo destino. Use el @dp.append_flow decorador en la interfaz Python o la CREATE FLOW...INSERT INTO cláusula de la interfaz SQL para agregar flujos para tareas como las siguientes:
- Agregue orígenes de streaming que anexe datos a una tabla de streaming existente sin necesidad de una actualización completa. Por ejemplo, puede tener una tabla que combine datos regionales de cada región en la que opera. A medida que se implementan nuevas regiones, puede agregar los nuevos datos de región a la tabla sin realizar una actualización completa. Vea Ejemplo: Escribir en una tabla de streaming desde varios temas de Kafka.
- Actualice una tabla de streaming anexando datos históricos que faltan (reposición). Puede usar la
INSERT INTO ONCEsintaxis para crear un rellenado histórico que se ejecute una vez. Consulte Ejemplo: realizar una reposición puntual de datos y Reposición de datos históricos con pipelines. - Combine datos de varios orígenes y escriba en una sola tabla de transmisión en lugar de usar la cláusula
UNIONen una consulta. El uso del proceso de adición en lugar deUNIONle permite actualizar la tabla de destino de forma incremental sin ejecutar una actualización completa. Vea Ejemplo: Uso del procesamiento de flujo de anexión en lugar deUNION.
Para las consultas de Python, use la función create_streaming_table() para crear una tabla de destino.
Important
- Si necesita definir restricciones de calidad de datos con expectativas, defina las expectativas en la tabla de destino como parte de la
create_streaming_table()función o en una definición de tabla existente. No se pueden definir expectativas en la@append_flowdefinición. - Los flujos se identifican mediante un nombre de flujo y este nombre se usa para identificar los puntos de control de streaming. El uso del nombre del flujo para identificar el punto de control significa lo siguiente:
- Si se cambia el nombre de un flujo existente en una canalización, el punto de control no se transfiere y el flujo renombrado se considera efectivamente un flujo completamente nuevo.
- No se puede reutilizar un nombre de flujo en una canalización, ya que el punto de control existente no coincidirá con la nueva definición de flujo.
Ejemplo: Escribir en una tabla de flujo continuo desde varios tópicos de Kafka
En los ejemplos siguientes se crea una tabla de streaming denominada kafka_target y se escribe en esa tabla de streaming desde dos tópicos de Kafka.
Pitón
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para obtener más información sobre la read_kafka() función con valores de tabla usada en las consultas SQL, consulte read_kafka en la referencia del lenguaje SQL.
En Python, puede crear mediante programación varios flujos que tienen como destino una sola tabla. En el ejemplo siguiente se muestra este patrón para obtener una lista de temas de Kafka.
Nota:
Este patrón tiene los mismos requisitos que el uso de un for bucle para crear tablas. Debe pasar explícitamente un valor de Python a la función que define el flujo. Consulte Creación de tablas en un for bucle.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Ejemplo: Ejecutar un relleno de datos único
Si desea ejecutar una consulta para anexar datos a una tabla de streaming existente, use append_flow.
Después de anexar un conjunto de datos existentes, tiene varias opciones:
- Si desea que la consulta anexe nuevos datos si llegan al directorio de relleno, mantenga la consulta en su lugar.
- Si quiere que sea una reposición única y nunca vuelva a ejecutarse, elimine la consulta después de ejecutar la canalización una vez.
- Si desea que la consulta se ejecute una vez y solo se ejecute de nuevo en los casos en los que los datos se actualizan por completo, establezca el
onceparámetroTrueen el flujo de anexar. En SQL, useINSERT INTO ONCE.
En los ejemplos siguientes se ejecuta una consulta para anexar datos históricos a una tabla de streaming:
Pitón
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.read
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Para obtener un ejemplo más detallado, consulte Reposición de datos históricos con canalizaciones.
Ejemplo: Uso del procesamiento de flujo de anexión en lugar de UNION
En lugar de usar una consulta con una UNION cláusula, puede utilizar consultas de flujo de apéndice para combinar varios orígenes y escribir en una sola tabla de transmisión. El uso de consultas de flujo de anexión en lugar de UNION permite anexar a una tabla de streaming desde varios orígenes sin ejecutar una actualización completa.
En el ejemplo de Python siguiente se incluye una consulta que combina varios orígenes de datos con una UNION cláusula :
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
En los siguientes ejemplos, se reemplaza la consulta UNION por consultas de flujo de datos agregadas.
Pitón
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);
Ejemplo: Uso transformWithState para supervisar los latidos del sensor
En el ejemplo siguiente se muestra un procesador con estado que lee de Kafka y comprueba que los sensores emiten latidos periódicamente. Si no se recibe un latido en un plazo de 5 minutos, el procesador emite una entrada a la tabla Delta de destino para su análisis.
Para obtener más información sobre cómo crear aplicaciones con estado personalizadas, consulte Compilación de una aplicación con estado personalizado.
Nota:
RocksDB es el proveedor de estado predeterminado a partir de Databricks Runtime 17.2. Si se produce un error en la consulta debido a una excepción de proveedor no compatible, agregue las siguientes configuraciones de canalización, realice una actualización completa o un restablecimiento de punto de control y vuelva a ejecutar la canalización:
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator
import pandas as pd
from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
KAFKA_TOPIC = "<your-kafka-topic>"
output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))
# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)
# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))
# No output on input processing, output only on timer expiry
return iter([])
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output
def close(self) -> None:
pass
dp.create_streaming_table("sensorAlerts")
# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])
@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))