Convert Logstash pipelines to OpenTelemetry Collector Pipelines
Introduction
Elastic observability strategy is increasingly aligned with OpenTelemetry. With the recent launch of Elastic Distributions of OpenTelemetry we’re expanding our offering to make it easier to use OpenTelemetry, the Elastic Agent now offers an "otel" mode, enabling it to run a custom distribution of the OpenTelemetry Collector, seamlessly enhancing your observability onboarding and experience with Elastic.
This post is designed to assist users familiar with Logstash transitioning to OpenTelemetry by demonstrating how to convert some standard Logstash pipelines into corresponding OpenTelemetry Collector configurations.
What is OpenTelemetry Collector and why should I care?
OpenTelemetry is an open-source framework that ensures vendor-agnostic data collection, providing a standardized approach for the collection, processing, and ingestion of observability data. Elastic is fully committed to this principle, aiming to make observability truly vendor-agnostic and eliminating the need for users to re-instrument their observability when switching platforms.
By embracing OpenTelemetry, you have access to these benefits:
- Unified Observability: By using the OpenTelemetry Collector, you can collect and manage logs, metrics, and traces from a single tool, providing holistic observability into your system's performance and behavior. This simplifies monitoring and debugging in complex, distributed environments like microservices.
- Flexibility and Scalability: Whether you're running a small service or a large distributed system, the OpenTelemetry Collector can be scaled to handle the amount of data generated, offering the flexibility to deploy as an agent (running alongside applications) or as a gateway (a centralized hub).
- Open Standards: Since OpenTelemetry is an open-source project under the Cloud Native Computing Foundation (CNCF), it ensures that you're working with widely accepted standards, contributing to the long-term sustainability and compatibility of your observability stack.
- Simplified Telemetry Pipelines: The ability to build pipelines using receivers, processors, and exporters simplifies telemetry management by centralizing data flows and minimizing the need for multiple agents.
In the next sections, we will explain how OTEL Collector and Logstash pipelines are structured, and we will clarify how the steps for each option are used.
OTEL Collector Configuration
An OpenTelemetry Collector Configuration has different sections:
- Receivers: Collect data from different sources.
- Processors: Transform the data collected by receivers
- Exporters: Send data to different collectors
- Connectors: Link two pipelines together
- Service: defines which components are active
- Pipelines: Combine the defined receivers, processors, exporters, and connectors to process the data
- Extensions are optional components that expand the capabilities of the Collector to accomplish tasks not directly involved with processing telemetry data (e.g., health monitoring)
- Telemetry where you can set observability for the collector itself (e.g., logging and monitoring)
We can visualize it schematically as follows:
We refer to the official documentation Configuration | OpenTelemetry for an in-depth introduction in the components.
Logstash pipeline definition
A Logstash pipeline is composed of three main components:
- Input Plugins: Allow us to read data from different sources
- Filters Plugins: Allow us to transform and filter the data
- Output Plugins: Allow us to send the data
Logstash also has a special input and a special output that allow the pipeline-to-pipeline communication, we can consider this as a similar concept to an OpenTelemetry connector.
Logstash pipeline compared to Otel Collector components
We can schematize how Logstash Pipeline and OTEL Collector pipeline components can relate to each other as follows:
Enough theory! Let us dive into some examples.
Convert a Logstash Pipeline into OpenTelemetry Collector Pipeline
Example 1: Parse and transform log line
Let's consider the below line:
2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404
We will apply the following steps:
- Read the line from the file /tmp/demo-line.log.
- Define the output to be an Elasticsearch datastream logs-access-default.
- Extract the @timestamp,user.name,client.ip,client.port,url.pathandhttp.status.code.
- Drop log messages related to the SYSTEMuser.
- Parse the date timestamp with the relevant date format and store it in @timestamp.
- Add a code http.status.code_descriptionbased on known codes' descriptions.
- Send data to Elasticsearch.
Logstash pipeline
input {
file {
path => "/tmp/demo-line.log" #[1]
start_position => "beginning"
add_field => { #[2]
"[data_stream][type]" => "logs"
"[data_stream][dataset]" => "access_log"
"[data_stream][namespace]" => "default"
}
}
}
filter {
grok { #[3]
match => {
"message" => "%{TIMESTAMP_ISO8601:[date]}: user %{WORD:[user][name]} accessed from %{IP:[client][ip]}:%{NUMBER:[client][port]:int} path %{URIPATH:[url][path]} with error %{NUMBER:[http][status][code]}"
}
}
if "_grokparsefailure" not in [tags] {
if [user][name] == "SYSTEM" { #[4]
drop {}
}
date { #[5]
match => ["[date]", "ISO8601"]
target => "[@timestamp]"
timezone => "UTC"
remove_field => [ "date" ]
}
translate { #[6]
source => "[http][status][code]"
target => "[http][status][code_description]"
dictionary => {
"200" => "OK"
"403" => "Permission denied"
"404" => "Not Found"
"500" => "Server Error"
}
fallback => "Unknown error"
}
}
}
output {
elasticsearch { #[7]
hosts => "elasticsearch-enpoint:443"
api_key => "${ES_API_KEY}"
}
}
OpenTelemtry Collector configuration
receivers:
filelog: #[1]
start_at: beginning
include:
- /tmp/demo-line.log
include_file_name: false
include_file_path: true
storage: file_storage
operators:
# Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
- type: copy
from: body
to: attributes['event.original']
- type: add #[2]
field: attributes["data_stream.type"]
value: "logs"
- type: add #[2]
field: attributes["data_stream.dataset"]
value: "access_log_otel"
- type: add #[2]
field: attributes["data_stream.namespace"]
value: "default"
extensions:
file_storage:
directory: /var/lib/otelcol/file_storage
processors:
# Adding host.name (this is done OOTB by Logstash)
resourcedetection/system:
detectors: ["system"]
system:
hostname_sources: ["os"]
resource_attributes:
os.type:
enabled: false
transform/grok: #[3]
log_statements:
- context: log
statements:
- 'merge_maps(attributes, ExtractgrokPatterns(attributes["event.original"], "%{TIMESTAMP_ISO8601:date}: user %{WORD:user.name} accessed from %{IP:client.ip}:%{NUMBER:client.port:int} path %{URIPATH:url.path} with error %{NUMBER:http.status.code}", true), "insert")'
filter/exclude_system_user: #[4]
error_mode: ignore
logs:
log_record:
- attributes["user.name"] == "SYSTEM"
transform/parse_date: #[5]
log_statements:
- context: log
statements:
- set(time, Time(attributes["date"], "%Y-%m-%dT%H:%M:%S"))
- delete_key(attributes, "date")
conditions:
- attributes["date"] != nil
transform/translate_status_code: #[6]
log_statements:
- context: log
conditions:
- attributes["http.status.code"] != nil
statements:
- set(attributes["http.status.code_description"], "OK") where attributes["http.status.code"] == "200"
- set(attributes["http.status.code_description"], "Permission Denied") where attributes["http.status.code"] == "403"
- set(attributes["http.status.code_description"], "Not Found") where attributes["http.status.code"] == "404"
- set(attributes["http.status.code_description"], "Server Error") where attributes["http.status.code"] == "500"
- set(attributes["http.status.code_description"], "Unknown Error") where attributes["http.status.code_description"] == nil
exporters:
elasticsearch: #[7]
endpoints: ["elasticsearch-enpoint:443"]
api_key: ${env:ES_API_KEY}
tls:
logs_dynamic_index:
enabled: true
mapping:
mode: ecs
service:
extensions: [file_storage]
pipelines:
logs:
receivers:
- filelog
processors:
- resourcedetection/system
- transform/grok
- filter/exclude_system_user
- transform/parse_date
- transform/translate_status_code
exporters:
- elasticsearch
These will generate the following document in Elasticsearch
{
"@timestamp": "2024-09-20T08:33:27.000Z",
"client": {
"ip": "89.66.167.22",
"port": 10592
},
"data_stream": {
"dataset": "access_log",
"namespace": "default",
"type": "logs"
},
"event": {
"original": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404"
},
"host": {
"hostname": "my-laptop",
"name": "my-laptop",
},
"http": {
"status": {
"code": "404",
"code_description": "Not Found"
}
},
"log": {
"file": {
"path": "/tmp/demo-line.log"
}
},
"message": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404",
"url": {
"path": "/blog"
},
"user": {
"name": "frank"
}
}
Example 2: Parse and transform a NDJSON-formatted log file
Let's consider the below json line:
{"log_level":"INFO","message":"User login successful","service":"auth-service","timestamp":"2024-10-11 12:34:56.123 +0100","user":{"id":"A1230","name":"john_doe"}}
We will apply the following steps:
- Read a line from the file /tmp/demo.ndjson.
- Define the output to be an Elasticsearch datastream logs-json-default
- Parse the JSON and assign relevant keys and values.
- Parse the date.
- Override the message field.
- Rename fields to follow ECS conventions.
- Send data to Elasticsearch.
Logstash pipeline
input {
file {
path => "/tmp/demo.ndjson" #[1]
start_position => "beginning"
add_field => { #[2]
"[data_stream][type]" => "logs"
"[data_stream][dataset]" => "json"
"[data_stream][namespace]" => "default"
}
}
}
filter {
if [message] =~ /^\{.*/ {
json { #[3] & #[5]
source => "message"
}
}
date { #[4]
match => ["[timestamp]", "yyyy-MM-dd HH:mm:ss.SSS Z"]
remove_field => "[timestamp]"
}
mutate {
rename => { #[6]
"service" => "[service][name]"
"log_level" => "[log][level]"
}
}
}
output {
elasticsearch { # [7]
hosts => "elasticsearch-enpoint:443"
api_key => "${ES_API_KEY}"
}
}
OpenTelemtry Collector configuration
receivers:
filelog/json: # [1]
include:
- /tmp/demo.ndjson
retry_on_failure:
enabled: true
start_at: beginning
storage: file_storage
operators:
# Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
- type: copy
from: body
to: attributes['event.original']
- type: add #[2]
field: attributes["data_stream.type"]
value: "logs"
- type: add #[2]
field: attributes["data_stream.dataset"]
value: "otel" #[2]
- type: add
field: attributes["data_stream.namespace"]
value: "default"
extensions:
file_storage:
directory: /var/lib/otelcol/file_storage
processors:
# Adding host.name (this is done OOTB by Logstash)
resourcedetection/system:
detectors: ["system"]
system:
hostname_sources: ["os"]
resource_attributes:
os.type:
enabled: false
transform/json_parse: #[3]
error_mode: ignore
log_statements:
- context: log
statements:
- merge_maps(attributes, ParseJSON(body), "upsert")
conditions:
- IsMatch(body, "^\\{")
transform/parse_date: #[4]
error_mode: ignore
log_statements:
- context: log
statements:
- set(time, Time(attributes["timestamp"], "%Y-%m-%d %H:%M:%S.%L %z"))
- delete_key(attributes, "timestamp")
conditions:
- attributes["timestamp"] != nil
transform/override_message_field: [5]
error_mode: ignore
log_statements:
- context: log
statements:
- set(body, attributes["message"])
- delete_key(attributes, "message")
transform/set_log_severity: # [6]
error_mode: ignore
log_statements:
- context: log
statements:
- set(severity_text, attributes["log_level"])
attributes/rename_attributes: #[6]
actions:
- key: service.name
from_attribute: service
action: insert
- key: service
action: delete
- key: log_level
action: delete
exporters:
elasticsearch: #[7]
endpoints: ["elasticsearch-enpoint:443"]
api_key: ${env:ES_API_KEY}
tls:
logs_dynamic_index:
enabled: true
mapping:
mode: ecs
service:
extensions: [file_storage]
pipelines:
logs/json:
receivers:
- filelog/json
processors:
- resourcedetection/system
- transform/json_parse
- transform/parse_date
- transform/override_message_field
- transform/set_log_severity
- attributes/rename_attributes
exporters:
- elasticsearch
These will generate the following document in Elasticsearch
{
"@timestamp": "2024-10-11T12:34:56.123000000Z",
"data_stream": {
"dataset": "otel",
"namespace": "default",
"type": "logs"
},
"event": {
"original": "{\"log_level\":\"WARNINg\",\"message\":\"User login successful\",\"service\":\"auth-service\",\"timestamp\":\"2024-10-11 12:34:56.123 +0100\",\"user\":{\"id\":\"A1230\",\"name\":\"john_doe\"}}"
},
"host": {
"hostname": "my-laptop",
"name": "my-laptop",
},
"log": {
"file": {
"name": "json.log"
},
"level": "WARNINg"
},
"message": "User login successful",
"service": {
"name": "auth-service"
},
"user": {
"id": "A1230",
"name": "john_doe"
}
}
Conclusion
In this post, we showed examples of how to convert a typical Logstash pipeline into an OpenTelemetry Collector pipeline for logs. While OpenTelemetry provides powerful tools for collecting and exporting logs, if your pipeline relies on complex transformations or scripting, Logstash remains a superior choice. This is because Logstash offers a broader range of built-in features and a more flexible approach to handling advanced data manipulation tasks.
What's Next?
Now that you've seen basic (but realistic) examples of converting a Logstash pipeline to OpenTelemetry, it's your turn to dive deeper. Depending on your needs, you can explore further and find more detailed resources in the following repositories:
- OpenTelemetry Collector: Learn about the core OpenTelemetry components, from receivers to exporters.
- OpenTelemetry Collector Contrib: Find community-contributed components for a wider range of integrations and features.
- Elastic's opentelemetry-collector-components: Dive into Elastic's extensions for the OpenTelemetry Collector, offering more tailored features for Elastic Stack users.
If you encounter specific challenges or need to handle more advanced use cases, these repositories will be an excellent resource for discovering additional components or integrations that can enhance your pipeline. All these repositories have a similar structure with folders named