Transformaciones de ejemplo en Azure Monitor

Las transformaciones en Azure Monitor le permiten filtrar o modificar los datos entrantes antes de que se envíen a un área de trabajo de Log Analytics. En este artículo se proporcionan consultas de ejemplo para escenarios comunes que puede usar para empezar a crear sus propias transformaciones. Consulte Crear una transformación en Azure Monitor para obtener más información sobre cómo probar estas transformaciones y agregarlas a una regla de recopilación de datos (DCR).

Reducción de los costos de datos

Dado que se le cobra por la ingesta de los datos enviados a un área de trabajo de Log Analytics, desea filtrar los datos que no necesita para reducir sus costos.

Filtrar filas de datos

Use una instrucción where para filtrar los datos entrantes que coincidan con determinados requisitos. Si el registro entrante no coincide con la instrucción, el registro no se envía al destino. En el siguiente ejemplo, solo se recopilan los registros con una gravedad de Critical.

source | where severity == "Critical"

Filtrar columnas de datos

Quite las columnas del origen de datos que no son necesarias para ahorrar en los costos de ingesta de datos. Use una instrucción project para especificar las columnas de la salida o use project-away para especificar solo las columnas que se van a quitar. En el ejemplo siguiente, la columna RawData se quita de la salida.

source | project-away RawData

Análisis de datos importantes de una columna

Es posible que tenga una columna con datos importantes enterrados en texto excesivo. Mantenga solo los datos valiosos y quite el texto que no es necesario. Use funciones de cadena como substring y extract para analizar los datos que desee. También puede analizar los datos mediante parse o split para dividir una sola columna en varios valores y seleccionar la que desee. A continuación, use extend para crear una nueva columna con los datos analizados y project-away para quitar la columna original.

Advertencia

Consulte Dividir comandos de análisis grandes para obtener sugerencias sobre el uso de comandos de análisis complejos.

En el ejemplo siguiente, la columna RequestContext contiene JSON con el ResourceId del área de trabajo. Las funciones parse_json y split se usan para extraer el nombre simple del área de trabajo. Se crea una nueva columna para este valor y se quitan las demás columnas.

source
| extend Context = parse_json(RequestContext)
| extend Workspace_CF = tostring(Context['workspaces'][0])
| extend WorkspaceName_CF = split(Workspace_CF,"/")[8]
| project-away RequestContext, Context, Workspace_CF

Envío de filas a registros básicos

Envíe filas de los datos que requieran funcionalidades de consulta básicas a tablas de registros básicas para un costo de ingesta menor. Consulte Envío de datos a varias tablas para obtener más información sobre cómo enviar datos a varias tablas.

Eliminación de datos confidenciales

Es posible que tenga un origen de datos que envíe información que no quiera almacenar por motivos de privacidad o cumplimiento.

Filtre información confidencial

Use las mismas estrategias descritas en Reducir los costos de datos para filtrar filas completas o columnas concretas que contienen información confidencial. En el ejemplo siguiente, la columna ClientIP se quita de la salida.

source | project-away ClientIP

Ofusque información confidencial

Use funciones de cadena para reemplazar información como dígitos en una dirección IP o un número de teléfono por un carácter común. En el ejemplo siguiente se reemplaza el nombre de usuario en una dirección de correo electrónico por "*****".

source | extend Email = replace_string(Email,substring(Email,0,indexof(Email,"@")),"*****")

Enviar a una tabla alternativa

Envíe registros confidenciales a una tabla alternativa con una configuración de control de acceso basado en rol diferente. Consulte Envío de datos a varias tablas para obtener más información sobre cómo enviar datos a varias tablas.

Enriquecer datos

Use una transformación para agregar información a los datos que proporcionan contexto empresarial o simplifican la consulta de los datos más adelante. Use funciones de cadena para extraer información crítica de una columna y luego use la instrucción extend para agregar una nueva columna al origen de datos. En el siguiente ejemplo se agrega una columna que identifica si una dirección IP de otra columna es interna o externa.

source | extend IpLocation = iff(split(ClientIp,".")[0] in ("10","192"), "Internal", "External")

Normalización de datos

Normalice los datos en un esquema común para simplificar la consulta y los informes, como el modelo de información de seguridad avanzada (ASIM) usado por Microsoft Sentinel. Use una transformación para normalizar los datos en tiempo de ingesta, tal como se describe en Normalización de tiempo de ingesta.

En el ejemplo siguiente, los datos entrantes se transforman en el esquema normalizado de la tabla ASimAuditEventLogs .

source
| project TimeGenerated = timestamp, EventOwner=owner, EventMessage=message, EventResult=result, EventSeverity=severity

Dar formato a los datos de destino

Es posible que tenga un origen de datos que envíe datos en un formato que no coincida con la estructura de la tabla de destino. Use una transformación para volver a formatear los datos en el esquema necesario.

Modificar esquema

Use comandos como extend y project para modificar el esquema de los datos entrantes para que coincidan con la tabla de destino. En el ejemplo siguiente, se agrega una nueva columna denominada TimeGenerated a los datos salientes mediante una función KQL para devolver la hora actual.

source | extend TimeGenerated = now()

Análisis de datos

Use el operador split o parse para analizar los datos en varias columnas de la tabla de destino. En el ejemplo siguiente, los datos entrantes tienen una columna delimitada por comas denominada RawData que se divide en columnas individuales para la tabla de destino. Divida comandos de análisis grandes para reducir el tiempo de procesamiento.

source 
| project d = split(RawData,",") 
| project TimeGenerated=todatetime(d[0]), Code=toint(d[1]), Severity=tostring(d[2]), Module=tostring(d[3]), Message=tostring(d[4])

Ejemplos de transformación de varias fases (versión preliminar)

Importante

Las transformaciones de varias fases se encuentran actualmente en versión preliminar pública. Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.

En los ejemplos siguientes se muestran las transformaciones de varias fases basadas en procesadores. Cada ejemplo muestra la transformations sección de una definición de DCR. Para obtener la estructura de DCR completa y cómo hacer referencia a estas transformaciones desde orígenes de datos y flujos de datos, consulte Creación de una transformación de varias fases.

Filtrar syslog por facility en el cliente

Filtre los registros de syslog en el cliente para conservar únicamente los eventos auth y authpriv antes de enviarlos por la red.

{
    "name": "client_filter_auth",
    "headerProcessor": {
        "processor": "header.Syslog",
        "configuration": {}
    },
    "processors": [
        {
            "processor": "filter.Basic",
            "configuration": {
                "any": [
                    {
                        "all": [
                            {
                                "columnName": "Facility",
                                "operator": "==",
                                "value": "auth"
                            }
                        ]
                    },
                    {
                        "all": [
                            {
                                "columnName": "Facility",
                                "operator": "==",
                                "value": "authpriv"
                            }
                        ]
                    }
                ]
            }
        }
    ]
}

Análisis de campos JSON de eventos de Windows

Extraiga campos estructurados de una carga útil JSON en la columna EventData de Eventos de Windows.

{
    "name": "client_parse_windows_events",
    "headerProcessor": {
        "processor": "header.WindowsEvents",
        "configuration": {}
    },
    "processors": [
        {
            "processor": "parse.XmlPath",
            "configuration": {
                "columnName": "RawXml",
                "all": [
                    {
                        "path": "/Event/System/EventID",
                        "nameAs": "EventID",
                        "typeAs": "int"
                    },
                    {
                        "path": "/Event/EventData/Data[@Name='SubjectUserName']",
                        "nameAs": "SubjectUserName",
                        "typeAs": "string"
                    }
                ]
            }
        },
        {
            "processor": "map.Drop",
            "configuration": {
                "columnNames": ["RawXml", "RenderingInfo"]
            }
        }
    ]
}

Contadores de rendimiento agregados

Agregue los datos del contador de rendimiento en el lado cliente para reducir el volumen de datos mediante el resumen de valores en un período de 5 minutos.

{
    "name": "client_aggregate_perf",
    "headerProcessor": {
        "processor": "header.WindowsPerformanceCounters",
        "configuration": {}
    },
    "processors": [
        {
            "processor": "aggregate.Basic",
            "configuration": {
                "batchingSettings": {
                    "timeWindow": "5m",
                    "maxBatchRows": 1000
                },
                "aggregates": [
                    {
                        "columnName": "CounterValue",
                        "operator": "avg",
                        "nameAs": "AvgValue"
                    },
                    {
                        "columnName": "CounterValue",
                        "operator": "max",
                        "nameAs": "MaxValue"
                    },
                    {
                        "operator": "count",
                        "nameAs": "SampleCount"
                    }
                ],
                "dimensionColumns": ["CounterName", "Instance"]
            }
        }
    ]
}

Importante

La agregación cambia completamente el esquema de salida. Enviar los datos agregados a una tabla personalizada independiente.

Extracción de atributos CEF de syslog

Extraer atributos CEF (Common Event Format) de mensajes syslog, utilizados frecuentemente para datos de dispositivos de seguridad.

{
    "name": "client_parse_cef",
    "headerProcessor": {
        "processor": "header.Syslog",
        "configuration": {}
    },
    "processors": [
        {
            "processor": "parse.CEFAttribute",
            "configuration": {
                "columnName": "Message",
                "all": [
                    {
                        "path": "deviceAction",
                        "nameAs": "DeviceAction",
                        "typeAs": "string"
                    },
                    {
                        "path": "sourceAddress",
                        "nameAs": "SourceIP",
                        "typeAs": "string"
                    },
                    {
                        "path": "destinationAddress",
                        "nameAs": "DestinationIP",
                        "typeAs": "string"
                    }
                ]
            }
        },
        {
            "processor": "enrich.DNSLookup",
            "configuration": {
                "columnName": "SourceIP",
                "nameAs": "SourceDNSName"
            }
        }
    ]
}

Transformación de KQL en tiempo de ingesta

Aplique una expresión KQL a un flujo estándar durante la ingesta. Esto proporciona una ruta de migración desde la propiedad heredada transformKql .

{
    "name": "ingestion_kql_syslog",
    "headerProcessor": {
        "processor": "header.StandardStream",
        "configuration": {
            "streamId": "Microsoft-Syslog"
        }
    },
    "processors": [
        {
            "processor": "transform.KQL",
            "configuration": {
                "expression": "source | where SeverityLevel != 'info' | extend EnrichedMsg = strcat(HostName, ': ', SyslogMessage)"
            }
        }
    ]
}

Enfoque de dos etapas de Syslog a CommonSecurityLog

Transformar los datos de syslog e ingerirlos como CommonSecurityLog mediante un enfoque de dos fases. La transformación del lado cliente analiza los atributos CEF y la transformación del lado de ingesta asigna los datos al esquema de tabla CommonSecurityLog.

Transformación del lado cliente:

{
    "name": "client_cef_extract",
    "headerProcessor": {
        "processor": "header.Syslog",
        "configuration": {}
    },
    "processors": [
        {
            "processor": "parse.CEFAttribute",
            "configuration": {
                "columnName": "Message",
                "all": [
                    {
                        "path": "deviceAction",
                        "nameAs": "DeviceAction",
                        "typeAs": "string"
                    }
                ]
            }
        }
    ]
}

Transformación en tiempo de ingesta:

{
    "name": "ingestion_map_to_csl",
    "headerProcessor": {
        "processor": "header.StandardStream",
        "configuration": {
            "streamId": "Microsoft-CommonSecurityLog"
        }
    },
    "processors": [
        {
            "processor": "transform.KQL",
            "configuration": {
                "expression": "source | extend DeviceAction = DeviceAction"
            }
        }
    ]
}