Este segundo post lo dedicaremos a Logstash, la herramienta del Elastic Stack pensada para gestionar procesos de ETL (Extract, Transform and Load).
El objetivo de este post es conocer los aspectos principales de la herramienta y entender mínimamente su uso. Para ello implementaremos un ejemplo práctico de extracción de información, transformación/enriquecimiento de ésta y su posterior ingesta en Elasticsearch.
¿Qué es Logstash?
Cómo acabamos de introducir, Logstash es parte del ElasticStack. Es una herramienta Open Source que nos permite centralizar la recogida de información, normalizarla y redistribuirla.

Aunque es una herramienta que puede funcionar de forma independiente, como parte del Stack vemos que Logstash tiene total integración con éste. Su integración con Beats, Elasticsearch y Kibana es nativa, ofreciendo así gran valor para la transformación avanzada de nuestra información.
Originalmente Logstash fue pensado para la recolección de logs, aunque actualmente su capacidad va más allá de este caso de uso. Mediante la definición de pipelines de procesos, Logstash nos permitirá recoger eventos desde multitud de orígenes de datos (inputs), su posterior transformación, enriquecimiento y normalización (filters) y su distribución a distintos destinos (outputs).
Llegados a este punto, podríamos preguntarnos ¿por qué no utilizar las pipelines de ingesta de Elasticsearch para este propósito y evitar así el uso de Logstash? La respuesta dependerá del caso concreto en que nos encontremos (origen de datos, características del clúster de Elasticsearch, etc). Recordemos que el uso de pipelines de ingesta supone una carga de procesamiento en los nodos de ingesta de nuestro clúster de Elasticsearch. Igualmente, dependiendo de las necesidades de extracción de nuestros orígenes de datos, sólo podremos realizarlo con las capacidades avanzadas que nos proporciona Logstash, no quedando cubierto con pipelines de ingesta.
Es importante indicar que, en entornos de producción con necesidades de transformación de grandes volúmenes de información, podemos escalar horizontalmente la solución según nuestras necesidades, habilitando para ello algún sistema de balanceo previo.
Pipelines
Como hemos indicado anteriormente, el proceso de todo pipeline estará formado por estos tres pasos:

Inputs
Logstash dispone de multitud de plugins para permitir la captura de nuestros eventos desde distintos orígenes datos.
Dentro de las diversas posibilidades que ofrece, veremos que es posible configurar Logstash para recibir eventos (tcp, udp, etc.), así como configurar Logstash para que acceda al dato de forma activa a partir de una programación temporal (http_poller, jdbc, etc.).
Filters
En este punto es donde realizaremos la transformación y el enriquecimiento de nuestros datos. Para ello Logstash dispone de una serie de plugins que podremos aplicar según nuestras necesidades.
Destacar que, como en el resto de pasos del pipeline, en los filtros podremos trabajar con condicionales para trabajar sobre eventos. Esto adquiere especial importancia en este punto, ya que nos permitirá aplicar distintas operativas según el cumplimiento de ciertos condicionales (basados en etiquetas, valores, existencia de campos, etc.)
Outputs
Una vez que tenemos el dato preparado, mediante el uso de multitud de plugins existentes, podremos seleccionar dónde y cómo queremos enviar nuestros eventos.
Otro punto importante en todo proceso de ETL no comentado hasta el momento, es la necesidad de conversión de formatos. Es por ello que Logstash ofrece multitud de códecs para este propósito.
¿Y si no encuentro el plugin que necesito?
Aunque es cierto que podemos encontrar multitud de plugins en la comunidad, en caso de necesitar del desarrollo de algún plugin específico no existente para poder cubrir nuestras propias necesidades, Elastic proporciona documentación oficial para este propósito en la siguiente url.
Colas
Logstash gestiona en memoria todas las etapas del procesamiento, por lo que un posible error en el equipo que tenga afectación al servicio puede afectar al correcto procesamiento de los eventos en proceso.
Para evitar que este tipo de casuística nos pueda comportar pérdida de información, es conveniente activar la característica de cola persistente (PersistentQueue). Igualmente es importante disponer de esta característica activada si queremos que Logstash pueda asumir picos puntuales en el número de eventos recibidos, que de otra manera podría suponer una pérdida de información según la arquitectura que esté establecida.
Respecto al uso de colas, comentar que Logstash permite activar las llamadas DeadLetterQueues. Esta característica permite tener el control sobre los eventos enviados a un destino del cual se recibe un código de error, permitiendo almacenar los eventos en disco para una revisión posterior y evitar así su pérdida.
Instalación
Llegados a este punto, ya estamos en disposición de instalar Logstash y ver un ejemplo práctico de su funcionamiento.
Como ya hicimos en el post anterior de Elasticsearch, no vamos a centrarnos en este post en la instalación de Logstash, ya que Elastic proporciona información detallada en este sentido en su documentación oficial. Podemos acceder a la documentación here.
Logstash requiere de JVM para su ejecución, por lo que requeriremos disponer de Java donde lo instalemos.

Pipeline de ejemplo
Una vez instalado, la mejor manera de ver en funcionamiento la herramienta es con un ejemplo práctico. En este vamos a abordar los siguientes puntos:
- Extracción de fechas, IPs, ids de usuario y comentarios desde registros de una base de datos.
- Obtención de campos en los comentarios recibidos.
- Enriquecimiento de ciertos campos.
- Ingesta en Elasticsearch.
Supongamos la necesidad de ingestar en nuestro clúster de Elasticsearch, todos los eventos que se generan en una tabla de una base de datos MariaDBque tiene 3 campos: un campo ID incremental, un campo HORA con el timestamp del registro y un campo TEXTO con la siguiente estructura fija: “Usuario: <USERID>, ip: <IP>, traza: <MSG>”.
De cada registro queremos guardar en Elasticsearch la siguiente información:
- Campo HORA del registro, como fecha de creación del documento en Elasticsearch.
- Campo USERID extraído del campo TEXTO del registro.
- Campo IP extraído del campo TEXTO del registro.
- Campo MSG extraído del campo TEXTO del registro.
Adicionalmente disponemos de un índice en Elasticsearch que contiene una relación de ids de usuario con su nombre completo. Queremos enriquecer el evento añadiendo en el documento creado en Elasticsearch un campo que contenga el nombre completo del id de usuario del registro.
También queremos enriquecer el campo IP de cada evento con el valor de la geo posición correspondiente. Igualmente necesitamos añadir el dominio correspondiente para poder disponer de la información en el documento de Elasticsearch resultante.
Empecemos con la implementación de nuestro pipeline, pero primero veamos cómo se presenta la información en origen que queremos recoger:
Un ejemplo de registro de nuestra tabla es el siguiente, donde la tabla se llama ‘usuarios’ y los campos id, hora y texto son los anteriormente comentados:

Respecto al índice existente en Elasticsearch, que dispone de una relación de id de usuario con su nombre completo, su nombre es “listado_usuarios” y la estructura que nos interesa es la siguiente:
"_source" : {
"user" : {
"full_name" : "Luis Fernandez Garcia",
"id" : "lfernan"
},
...
Lo primero que hacemos es definir en el fichero de definición de pipelines a procesar por nuestra instancia de Logstash (pipelines.yml), el pipeline que vamos a crear. Añadimos la siguiente entrada:
- pipeline.id: ejemplo
path.config: "/etc/logstash/conf.d/mariadb2elastic.conf"
A continuación, veamos el pipeline:
input {
jdbc {
id => "ejemplo"
JDBC MariaDB
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mariadb-java-client-2.6.0.jar"
jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
jdbc_connection_string => "jdbc:mariadb://servidor_mariadb:3306/eventos"
jdbc_user => "user"
jdbc_password => "password"
#Query
statement => "SELECT id,hora,texto FROM usuarios WHERE id > :sql_last_value"
#Tracking
use_column_value => true
tracking_column => "id"
#pathsql_last_value
last_run_metadata_path => "/etc/logstash/jdbc/logstash_jdbc"
#programacion cada minuto
schedule => "* * * * *"
}
}
filter {
#Parseo texto
grok{ match => { "texto" => "^Usuario: %{NOTSPACE:[user][id]}, ip: %{IP:[source][ip]}, traza: %{GREEDYDATA:message}" } }
#Formato campo
mutate{ convert => [ "hora", "string" ] }
#Asignacion hora @timestamp
date {
match => ["hora","ISO8601"]
remove_field => [ "hora" ]
}
#Enriquecimiento dato desde indice Elasticsearch
elasticsearch {
user => "user"
password => "password"
index => "listado_usuarios"
hosts => [ "https://nodo_elasticsearch:9200"]
query => 'user.id:%{[user][id]}'
enable_sort => false
fields =>{ "[user][full_name]" => "[user][full_name]" }
}
#Eliminacion y creacion de campos
mutate {
remove_field => ["texto","id"]
add_field =>{ "[source][domain]" => "%{[source][ip]}" }
}
#Enriquecimiento dns
dns {
reverse => [ "[source][domain]" ]
action => "replace"
}
#Enriquecimiento geoip
geoip {
source => "[source][ip]"
target => "[source][geo]"
}
}
output {
#Ingesta en indice Elasticsearch
elasticsearch {
user =>user
password =>password
index => "eventos-alias"
ssl => true
truststore => "/etc/logstash/elastic-certificates.p12"
truststore_password => "password"
hosts => ["https://nodo_elasticsearch:9200"]
}
}
Como vemos, en nuestro input invocamos al plugin jdbc, asignando un id y realizando la configuración necesaria para el conector (jdbc_*). Posteriormente definimos la query que nos devolverá los campos requeridos (statement) e indicamos a Logstash el campo utilizado para realizar el seguimiento de los eventos ya consultados (tracking_column), el cual será persistente en fichero (last_run_metadata_path).
Por último, indicamos una planificación de consulta a la base de datos recurrente cada minuto (schedule).
Posteriormente en filter, para cada evento se parsea el contenido del campo ‘texto’ buscando por el patrón definido anteriormente (grok), mediante el cual se obtienen los campos user.id, source.ip y message, con los datos del id de usuario, su ip y el mensaje proporcionado respectivamente.
- Utilizaremos date para fijar la hora del evento como la hora del documento (@timestamp) correspondiente en Elasticsearch.
- Para obtener el nombre completo (user.full_name) que corresponde al id del usuario (user.id), como hemos comentado anteriormente consultaremos la información en un índice de Elasticsearch.
- Para eliminar los campos que no necesitamos o añadir nuevos, utilizamos mutate.
- Para realizar el enriquecimiento dns utilizamos el plugin dns, guardando el resultado en source.domain.
- Para realizar el enriquecimiento de geo posición utilizamos el plugin geoip. Según la casuística de ips, puede ser conveniente utilizarlo conjuntamente con el plugin cidr para poder filtrar su uso a ciertos patrones.
Una vez el evento está preparado para su ingesta en Elasticsearch, configuraremos dicho destino en output para realizar la ingesta.
Tras ejecutarse nuestro pipeline, para cada evento procesado veremos en el índice de Elasticsearch la estructura esperada. Para el caso del evento anteriormente mostrado:
"_source": {
"@timestamp": "2020-05-05T17:18:13.000Z",
"user": {
"full_name": "Luis Fernandez Garcia",
"id": "lfernan"
},
"message": "Error in module frame1",
"@version": "1",
"source": {
"geo": {
"longitude": -3.684,
"country_code2": "ES",
"country_code3": "ES",
"timezone": "Europe/Madrid",
"ip": "88.87.150.250",
"continent_code": "EU",
"latitude": 40.4172,
"location": {
"lon": -3.684,
"lat": 40.4172
},
"country_name": "Spain"
},
"domain": "250-150-87-88.zertia.es",
"ip": "88.87.150.250"
}
},
Por simplificar el ejemplo, no hemos utilizado filtrado de eventos en base a errores (de parseo, de consultas, etc), pero es importante tenerlo presente para realizar un control más preciso de la información que ingestaremos en Elasticsearch.
Respecto al uso de Grok, para quien no esté familiarizado con él, es recomendable consultar su documentación así como el uso de algún debugger para depurar nuestros parseos. Existen varios online que podemos utilizar, aunque Elastic ya incluye un debugger en Kibana.
Una recomendación importante a la hora de implementar pipelines de diversos orígenes de datos para su posterior ingesta en Elasticsearch, es la necesidad de seguir una estandarización en la nomenclatura de los campos utilizados. Para este propósito Elastic ha definido ECS (Elastic Common Schema).
Monitorización pipeline
Sin entrar en el detalle de cómo poder configurar en nuestro Stack la monitorización de nuestras instancias de Logstash y las pipelines utilizadas, sí vemos conveniente comentar que su uso es muy recomendable. Así podremos visualizar de forma centralizada desde el módulo de monitoring de Kibana toda la información relativa a Logstash, siendo de especial importancia para detectar posibles problemas de rendimiento.

Otro aspecto importante a comentar, aunque sólo está disponible es versiones con licencia, es la posibilidad de centralizar la gestión de pipelines desde Kibana. Podemos encontrar la información correspondiente en este enlace.
Conclusión
Como ha quedado reflejado a lo largo del artículo, Logstash es un componente del Stack que deberemos tener muy en cuenta cuando nuestro proceso de transformación de datos sea complejo.
Esperamos que el artículo os haya sido de utilidad para dar los primeros pasos con Logstash en vuestra organización.