¿Qué son los Beats de elastic? + Ejemplo de uso de Filebeat

En este tercer artículo introduciremos Beats, los agentes ligeros de Elastic que ya nombramos en nuestro primer post de introducción a Elasticsearch. Tras comentar los beats más relevantes actualmente, nos centraremos en uno de ellos, Filebeat.

El propósito es, tras haber ya introducido Elasticsearch y Logstash en nuestros dos primeros artículos, conocer en este cómo utilizar Filebeat para la ingesta de logs y ver cómo utilizar pipelines de ingesta en Elasticsearch. Para ello instalaremos una instancia de Filebeat que supervisará 3 tipologías de logs: errores de una base de datos MariaDB, un listado de eventos en formato json y errores de una aplicación propietaria.
Cada tipología de error tendrá sus propias características y explicaremos con un ejemplo práctico cómo poder abordarlas.

Tras estos tres posts iniciales, ya tendremos una visión global mínima del Stack que nos permita poder recoger, transformar, enriquecer e ingestar un dataset de ejemplo en nuestro clúster de Elasticsearch, para poder así introducir y trabajar con módulos como ML, SIEM, etc. Ese será el objetivo en siguientes artículos.

¿Qué son los Beats?

Como acabamos de introducir, Beats son agentes de datos ligeros, los cuales podremos utilizar para recopilar cierto tipo de información y que tras su transformación y enriquecimiento podremos por ejemplo ingestarla en Logstash o Elasticsearch.

Inicialmente, ante la necesidad de recopilar datos de red, Elastic publicó su primer beat: Packetbeat.

Con el tiempo no sólo se han añadido más beats, sino que la funcionalidad de estos ha sufrido una gran mejora, algo que sigue sucediendo con cada una de las actualizaciones que se liberan.

En la actualidad, Elastic distribuye varios tipos de beats, cada uno de ellos con un propósito específico. Entre todos ellos comentaremos los más utilizados, aunque podéis consultar el listado completo here.

Beats más usados

Filebeat

Filebeat está pensado principalmente para leer ficheros de logs, aunque soporta algunos otros inputs como TCP, etc. El caso de uso más común es la recogida de logs en equipos remotos para su posterior centralización en Logstash o Elasticsearch.

Además de ser capaz de gestionar el último evento enviado y continuar posteriormente desde ese punto en caso de un corte en la comunicación, dispone de un control de carga para Logstash que le permite reducir la ratio de envío en casos de saturación en Logstash.

A lo largo del tiempo, Elastic ha ido ampliando la funcionalidad del agente, dotando a este de una multitud de módulos especialmente pensados para facilitar el parseo de logs de soluciones ampliamente reconocidas (Apache, MySQL, Nginx, etc). De este modo no sólo se simplifica enormemente el proceso de identificación de path de logs y su posterior parseo, sino que también se facilita la ingesta optimizada del dato en Elasticsearch mediante templates y pipelines de ingesta predefinidos, y la creación de visualizaciones para posteriormente poder explotar la información desde dashboards en Kibana.

Para poder consultar el listado completo de módulos que ofrece el fabricante, podéis consultar esta url.

Más adelante veremos un caso práctico de cómo utilizar Filebeat para la recogida e ingesta en Elasticsearch de varios logs.

Para ver toda la información relativa a este beat, recomendamos consultar la url del fabricante.

Metricbeat

Si lo que queremos es recoger métricas de servicios o sistemas (memoria, cpu, i/o disco, etc), Metricbeat será una solución perfecta para integrar el dato en nuestro clúster de Elasticsearch.

Como con el resto de beats, se trata de un agente ligero, el cual no repercutirá negativamente en el rendimiento del equipo donde se ejecuta.

Tal cómo indicábamos anteriormente en el caso de Filebeat, Merticbeat también dispone de multitud de módulos preestablecidos (MongoDB, Apache, PostgreSQL, etc) para facilitar la recogida, ingesta y explotación de nuestras métricas en Elasticsearch mediante visualizaciones predefinidas.

Adicionalmente a las visualizaciones que podamos crear para abordar nuestras propias necesidades, Kibana dispone del módulo Metrics mediante el cual podremos visualizar de forma centralizada la información recogida por Metricbeat.

Para poder consultar el listado completo de módulos que ofrece el fabricante, podéis consultar esta url.

Para ver toda la información relativa a este beat, recomendamos consultar la url del fabricante.

Packetbeat

Con Packetbeat podremos analizar paquetes de red en tiempo real, pudiendo entender así qué sucede en nuestra red. Nos permitirá recoger e ingestar en nuestro clúster de Elasticsearch datos relativos a latencias, tiempos de respuesta, errores, tendencias, etc sin interferir en nuestra infraestructura.

Packetbeat es compatible con muchos protocolos de capa de aplicación, desde bases de datos hasta protocolos de bajo nivel. En esta url podremos consultar toda la información relativa al beat y la lista de protocolos soportados, así como disponer de toda la información completa aquí.

Winlogbeat

Para poder analizar los registros de eventos de log de nuestro parque de Windows, Elastic pone a nuestra disposición Winlogbeat. Con este beat podremos tanto recoger y enviar a Logstash todos nuestros eventos de Windows para realizar operaciones complejas sobre los datos, como ingestarlos directamente en nuestro clúster de Elasticsearch para posteriormente analizar y explotar los datos desde dashboards personalizados.

Para ver toda la información relativa a este beat, recomendamos consultar la url del fabricante.

Auditbeat

Auditbeat nos permite recoger datos de auditoría, monitorizando procesos y la actividad de usuario en tiempo real. En Linux se integra con el framework de audit, con la funcionalidad que esto representa.

Con Auditbeat también podremos monitorizar la integridad de ficheros, reportando información completa (metadata, hashes, …) sobre los cambios detectados en tiempo real.

Para ver toda la información relativa a este beat, recomendamos consultar la url del fabricante.

Heartbeat

Con Heartbeat podremos monitorizar el tiempo de actividad y de respuesta de nuestros endpoints, mediante la configuración centralizada de estos.

De esta manera, si lo que necesitamos es supervisar el ping de hosts o servicios, ya sea mediante ICMP, HTTP o TCP, Heartbeat nos facilitará la recogida de datos y posterior ingesta en Elasticsearch para poder así analizar y visualizar el dato.

Kibana dispone del módulo Uptime, mediante el cual podremos visualizar toda la información recogida por Heartbeat, adicionalmente a las visualizaciones que podamos crear para abordar nuestras propias necesidades.

Para ver toda la información relativa a este beat, recomendamos consultar la url del fabricante.

Beats de la comunidad

Adicionalmente a los beats oficiales de Elastic, existen multitud de beats mantenidos por la comunidad. El listado de muchos de ellos pueden ser consultados desde la propia página de Elastic en este enlace, aunque como es normal Elastic no proporciona ni soporte ni garantiza su funcionamiento.

Si nuestra intención es desarrollar un nuevo beat para ingestar datos en Elasticsearch, Elastic recomienda el uso de la extensa librería libbeat. Para ello proporciona una completa guía para su creación.

Es importante también comentar que todos los beats respetan el ECS (ElasticCommonSchema). Esto nos permitirá ingestar nuestros datos en Elasticsearch siguiendo el esquema común del fabricante, algo necesario si por ejemplo queremos posteriormente explotar correctamente módulos integrados en Kibana como SIEM.

Ejemplo de uso Filebeat

Una vez introducidos los beats más populares, tal como indicamos al inicio del post, nos centraremos en Filebeat para realizar una captura de logs de aplicación y posterior envío a Elasticsearch.

Es importante comentar que, tanto en caso de pérdida de comunicación de nuestro agente con Elasticsearch o Logstash, o de una interrupción del propio servicio, nuestra información no sufrirá pérdidas. Esto es gracias a que Filebeat gestiona en todo momento cual fue el punto en que se quedó previamente, continuando así desde ese punto una vez que el problema queda solventado.

Como hemos indicado en varias ocasiones, Elastic dispone de mucha información en su documentación oficial, entre la que se incluyen pequeños ejemplos de cómo realizar ciertas configuraciones. Es por ello que no tiene sentido repetir en este artículo un ejemplo tipo que ya esté recogido en su propia documentación.

Es por ello, que para poder mostrar la versatilidad que nos ofrece Filebeat, instalaremos una sola instancia para supervisar tres tipologías de logs distintos, los cuales queremos ingestar en Elasticsearch en índices distintos y aplicando en algunos casos un parseo previo mediante pipelines de ingesta.

La situación que queremos abordar es la siguiente:

1ª tipología logs:

  • Queremos recoger los logs estándar de error de una base de datos MariaDB.ç
  • No queremos parsear manualmente los datos, para ello queremos utilizar el módulo de MySQL del que ya dispone Filebeat.
  • Queremos ingestarlos en un patrón de índice ‘mysql-*’.

2ª tipología logs:

  • Queremos recoger logs en formato json que se ubican en un solo path con extensión .json
  • Queremos guardar en campos separados cada pareja de clave/valor del json, para posteriormente poder explotar los datos correctamente.
  • Queremos ingestarlos en un patrón de índice ‘app1-*’ que rote cada día, donde el nombre del índice contenga la fecha.

3ª tipología logs:

  • Queremos recoger los logs de una aplicación propietaria que se pueden ubicar en dos paths distintos.
  • No queremos guardar los eventos de debugging reportados (comienzan por ‘[DEBUG]’)
  • Queremos gestionar correctamente los eventos con multilínea.
  • Siguiendo nuestro formato de log normalizado, queremos parsear correctamente cada campo y enriquecer el campo ip con geo posición. También queremos que la fecha de cada documento sea la de creación del evento.
  • Queremos ingestarlos en un patrón de índice ‘app2-*’.

Instalación

Como ya hicimos en artículos anteriores, no vamos a centrarnos en este post en la instalación de Filebeat, ya que Elastic proporciona información detallada en este sentido en su documentación oficial. Podemos acceder a la documentación here.

Configuración

Como vamos a necesitar cargar en Elasticsearch configuración sobre el módulo de mysql que utilizaremos, lo primero que haremos en definir nuestro clúster de Elasticsearch en el fichero de configuración de Filebeat (filebeat.yml). Para ello nos ubicaremos en el apartado ‘output.elasticsearch’ y definiremos allí los campos necesarios.

Previamente al resto de configuración que debemos añadir en el fichero de configuración de Filebeat, realizamos las siguientes configuraciones necesarias dadas nuestras distintas tipologías de logs a tratar:

Para la 1ª tipología:

En nuestro caso no podemos basarnos en la configuración genérica de un módulo, por lo que realizamos estos pasos:

  • Activamos el módulo mysql:
filebeat modules enablemysql
  • En todo momento podremos visualizar los módulos activos en nuestra instancia:
filebeat modules list
  • Configuramos el fichero correspondiente al módulo, ‘…/filebeat/modules.d/mysql.yml’. Como no queremos recoger slowlogs, activamos sólo los de errores e indicamos su path:
- module: mysql
  error:
enabled: true
var.paths: ["/var/log/mysql/error.log*"]
slowlog:
enabled: false
  • Cargamos en Elasticsearch los pipelines correspondientes al módulo. Tras esta carga podremos por ejemplo clonarlo a un pipeline llamado ‘mysql-errors’ que es el que utilizaremos en nuestro ejemplo.
filebeatsetup --pipelines --modules mysql
  • Cargamos en Elasticsearch los componentes relacionados con la gestión del índice. De esta manera podremos por ejemplo posteriormente clonar y adaptar a nuestra casuística el template a uno llamado ‘mysql’, que es el que utilizaremos en nuestro ejemplo.Sin entrar en detalle, indicar que esto nos dará la posibilidad de crear la estructura optimizada que queramos, ya sea descartando ciertos campos, modificando configuración del índice (shards, refresco, etc), ILM, etc.
filebeatsetup --index-management --modules mysql

Para la 2ª tipología:

  • Creamos el template que consideremos necesario para cubrir nuestras necesidades de configuración y mapeo de campos.Filebeatya tratará el json en origen.

Para la 3ª tipología:

  • Creamos el template que consideremos necesario para cubrir nuestras necesidades de configuración y mapeo de campos.
  • Definimos un pipeline de ingesta para parsear correctamente nuestro log, definiendo la hora de creación del evento como hora del documento y enriqueciendo con geo posición la ip de origen. Para el nombre de los campos utilizaremos el ECS definido por el fabricante.

Dada la siguiente tipología de log en nuestra aplicación, crearemos el pipeline de ingesta siguiente:

Eventos con el siguiente formato:

[LEVEL] DATE IP USER MSG

Traza de ejemplo:

[ERROR] 23/Jun/2020:15:34:12 +0000 88.87.120.45 a0147 Fatal error: Outofmemory

Creamos el siguiente pipeline de ingesta:

PUT _ingest/pipeline/app2
{
  "description" : "Ingest pipeline para app2",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["\\[%{WORD:log.level}\\] %{HTTPDATE:@timestamp} %{IP:source.ip} %{USER:user.id} %{GREEDYDATA:error.message}"]
      }
    },
    {
      "date": {
        "field": "@timestamp",
        "formats": [ "dd/MMM/YYYY:HH:mm:ss Z" ]
      }
    },
    {
      "geoip": {
        "field": "source.ip",
        "target_field": "source.geo"
      }
    }
  ]
}

Para poder probar el correcto funcionamiento, podemos simular en Elasticsearch previamente su comportamiento utilizando el ejemplo de traza anterior. Para ello invocamos _simulate:

POST _ingest/pipeline/app2/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "[ERROR] 23/Jun/2020:15:34:12 +0000 88.87.120.45 a0147 Fatal error: Outofmemory"
      }
    }
  ]
}

Confirmamos que el comportamiento es el esperado:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2020-06-23T15:34:12.000Z",
          "log" : {
            "level" : "ERROR"
          },
          "source" : {
            "geo" : {
              "continent_name" : "Europe",
              "region_iso_code" : "IT-CN",
              "city_name" : "Busca",
              "region_name" : "Provincia di Cuneo",
              "location" : {
                "lon" : 7.4766,
                "lat" : 44.5162
              },
              "country_iso_code" : "IT"
            },
            "ip" : "88.87.120.45"
          },
          "message" : "[ERROR] 23/Jun/2020:15:34:12 +0000 88.87.120.45 a0147 Fatal error: Outofmemory",
          "error" : {
            "message" : "Fatal error: Outofmemory"
          },
          "user" : {
            "id" : "a0147"
          }
        },
        "_ingest" : {
          "timestamp" : "2020-06-24T12:17:17.775424Z"
        }
      }
    }
  ]
}

En este punto ya podemos configurar Filebeat (filebeat.yml) con las 3 casuísticas anteriormente descritas.

  • Definimos nuestros 3 inputs:
filebeat.inputs:
#Condicionantes a cumplir para la 2ª tipología de logs
- type: log
enabled: true
paths:
    - /path/logs/*.json
json.keys_under_root: true
json.add_error_key: true
  tags: ["app1"]

#Condicionantes a cumplir para la 3ª tipología de logs
- type: log 
enabled: true
paths:
    - "/path1/logs/*"
    - "/path2/logs/*"
exclude_lines: ['^\[DEBUG\]']
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
  tags: ["app2"]
#Habilitamos la gestión de módulos para la 1ª tipología de logs
filebeat.config.modules:
path: /etc/filebeat/modules.d/*.yml
  • Definimos nuestro output, en nuestro caso en la configuración correspondiente a nuestro clúster de Elasticsearch, con condicionales para la correcta selección de índice y pipeline de ingesta:
output.elasticsearch:
…
indices:
    - index: "mysql-alias"
when.equals:
event.module: "mysql"
    - index: "app1-%{+yyyy.MM.dd}"
when.contains:
        tags: "app1"
    - index: "app2-alias"
when.contains:
        tags: "app2"
  pipelines:
    - pipeline: "app2"
when.contains:
        tags: "app2"
    - pipeline: "mysql-errors"
when.equals:
event.module: "mysql"

Una vez guardados los cambios, reiniciaremos nuestro agente para aplicarlos.

Adicionalmente, si queremos aprovechar visualizaciones predefinidas en el módulo de mysql, podemos cargarlas en Kibana invocando la siguiente instrucción:

filebeatsetup --dashboards -E setup.dashboards.index=mysql-*

Es aconsejable indicar adicionalmente el path del json correspondiente a nuestro módulo, si no queremos cargar todo.

-E setup.dashboards.directory=PATH

Para nuestro ejemplo, podríamos reutilizar estas visualizaciones que vienen predefinidas. Vemos como refleja un error de ejecución que hemos forzado para este propósito en nuestra base de datos:

Conclusión

Como hemos podido ver a lo largo de este artículo, Elastic proporciona una multitud de agentes preparados para recoger, transformar e ingestar multitud de tipos de datos, lo cual nos ayudará a minimizar los tiempos de implantación en nuestra organización.

En próximos artículos nos adentraremos en Kibana, la interfaz que nos permitirá visualizar y gestionar nuestros datos en Elasticsearch. Veremos cómo explotar nuestros datos, tanto con visualizaciones personalizadas como con el uso de las múltiples soluciones integradas desarrolladas por Elastic.

Elastic Technology Consultant & Elastic Certified Engineer & Analyst

Share this post: