Introduction:
The prevalence of high-entropy logs in distributed systems has significantly raised the risk of PII (Personally Identifiable Information) seeping into our logs, which can result in security and compliance issues. This 2-part blog delves into the crucial task of identifying and managing this issue using the Elastic Stack. We will explore using NLP (Natural Language Processing) and Pattern matching to detect, assess, and, where feasible, redact PII from logs that are being ingested into Elasticsearch.
In Part 1 of this blog, we will cover the following:
- Review the techniques and tools we have available to manage PII in our logs
- Understand the roles of NLP / NER in PII detection
- Build a composable processing pipeline to detect and assess PII
- Sample logs and run them through the NER Model
- Assess the results of the NER Model
In Part 2 of this blog of this blog, we will cover the following:
- Redact PII using NER and the redact processor
- Apply field-level security to control access to the un-redacted data
- Enhance the dashboards and alerts
- Production considerations and scaling
- How to run these processes on incoming or historical data
Here is the overall flow we will construct over the 2 blogs:
All code for this exercise can be found at: https://github.com/bvader/elastic-pii.
Tools and Techniques
There are four general capabilities that we will use for this exercise.
- Named Entity Recognition Detection (NER)
- Pattern Matching Detection
- Log Sampling
- Ingest Pipelines as Composable Processing
Named Entity Recognition (NER) Detection
NER is a sub-task of Natural Language Processing (NLP) that involves identifying and categorizing named entities in unstructured text into predefined categories such as:
- Person: Names of individuals, including celebrities, politicians, and historical figures.
- Organization: Names of companies, institutions, and organizations.
- Location: Geographic locations, including cities, countries, and landmarks.
- Event: Names of events, including conferences, meetings, and festivals.
For our use PII case, we will choose the base BERT NER model bert-base-NER that can be downloaded from Hugging Face and loaded into Elasticsearch as a trained model.
Important Note: NER / NLP Models are CPU-intensive and expensive to run at scale; thus, we will want to employ a sampling technique to understand the risk in our logs without sending the full logs volume through the NER Model. We will discuss the performance and scaling of the NER model in part 2 of the blog.
Pattern Matching Detection
In addition to using an NER, regex pattern matching is a powerful tool for detecting and redacting PII based on common patterns. The Elasticsearch redact processor is built for this use case.
Log Sampling
Considering the performance implications of NER and the fact that we may be ingesting a large volume of logs into Elasticsearch, it makes sense to sample our incoming logs. We will build a simple log sampler to accomplish this.
Ingest Pipelines as Composable Processing
We will create several pipelines, each focusing on a specific capability and a main ingest pipeline to orchestrate the overall process.
Building the Processing Flow
Logs Sampling + Composable Ingest Pipelines
The first thing we will do is set up a sampler to sample our logs. This ingest pipeline simply takes a sampling rate between 0 (no log) and 10000 (all logs), which allows as low as ~0.01% sampling rate and marks the sampled logs with
The command should be run from the Kibana -> Dev Tools
The code can be found here for the following three sections of code.
logs-sampler pipeline code - click to open/close
# logs-sampler pipeline - part 1
DELETE _ingest/pipeline/logs-sampler
PUT _ingest/pipeline/logs-sampler
{
"processors": [
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"if": "ctx.sample.sample_rate == null",
"field": "sample.sample_rate",
"value": 10000
}
},
{
"set": {
"description": "Determine if keeping unsampled docs",
"if": "ctx.sample.keep_unsampled == null",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"set": {
"field": "sample.sampled",
"value": false
}
},
{
"script": {
"source": """ Random r = new Random();
ctx.sample.random = r.nextInt(params.max); """,
"params": {
"max": 10000
}
}
},
{
"set": {
"if": "ctx.sample.random <= ctx.sample.sample_rate",
"field": "sample.sampled",
"value": true
}
},
{
"drop": {
"description": "Drop unsampled document if applicable",
"if": "ctx.sample.keep_unsampled == false && ctx.sample.sampled == false"
}
}
]
}
Now, let's test the logs sampler. We will build the first part of the composable pipeline. We will be sending logs to the logs-generic-default data stream. With that in mind, we will create the
Next, we will create the
process-pii pipeline code - click to open/close
# Process PII pipeline - part 1
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
}
]
}
Finally, we create the logs
logs@custom pipeline code - click to open/close
# logs@custom pipeline - part 1
DELETE _ingest/pipeline/logs@custom
PUT _ingest/pipeline/logs@custom
{
"processors": [
{
"set": {
"field": "pipelinetoplevel",
"value": "logs@custom"
}
},
{
"set": {
"field": "pipelinetoplevelinfo",
"value": "{{{data_stream.dataset}}}"
}
},
{
"pipeline": {
"description" : "Call the process_pii pipeline on the correct dataset",
"if": "ctx?.data_stream?.dataset == 'pii'",
"name": "process-pii"
}
}
]
}
Now, let's test to see the sampling at work.
Load the data as described here Data Loading Appendix. Let's use the sample data first, and we will talk about how to test with your incoming or historical logs later at the end of this blog.
If you look at Observability -> Logs -> Logs Explorer with KQL filter
At this point we have a composable ingest pipeline that is "sampling" logs. As a bonus, you can use this logs sampler for any other use cases you have as well.
Loading, Configuration, and Execution of the NER Pipeline
Loading the NER Model
You will need a Machine Learning node to run the NER model on. In this exercise, we are using Elastic Cloud Hosted Deployment on AWS with the CPU Optimized (ARM) architecture. The NER inference will run on a Machine Learning AWS c5d node. There will be GPU options in the future, but today, we will stick with CPU architecture.
This exercise will use a single c5d with 8 GB RAM with 4.2 vCPU up to 8.4 vCPU
Please refer to the official documentation on how to import an NLP-trained model into Elasticsearch for complete instructions on uploading, configuring, and deploying the model.
The quickest way to get the model is using the Eland Docker method.
The following command will load the model into Elasticsearch but will not start it. We will do that in the next step.
docker run -it --rm --network host docker.elastic.co/eland/eland \
eland_import_hub_model \
--url https://mydeployment.es.us-west-1.aws.found.io:443/ \
-u elastic -p password \
--hub-model-id dslim/bert-base-NER --task-type ner
Deploy and Start the NER Model
In general, to improve ingest performance, increase throughput by adding more allocations to the deployment. For improved search speed, increase the number of threads per allocation.
To scale ingest, we will focus on scaling the allocations for the deployed model. More information on this topic is available here. The number of allocations must be less than the available allocated processors (cores, not vCPUs) per node.
To deploy and start the NER Model. We will do this using the Start trained model deployment API
We will configure the following:
- 4 Allocations to allow for more parallel ingestion
- 1 Thread per Allocation
- 0 Byes Cache, as we expect a low cache hit rate
- 8192 Queue
# Start the model with 4 Allocators x 1 Thread, no cache, and 8192 queue
POST _ml/trained_models/dslim__bert-base-ner/deployment/_start?cache_size=0b&number_of_allocations=4&threads_per_allocation=1&queue_capacity=8192
You should get a response that looks something like this.
{
"assignment": {
"task_parameters": {
"model_id": "dslim__bert-base-ner",
"deployment_id": "dslim__bert-base-ner",
"model_bytes": 430974836,
"threads_per_allocation": 1,
"number_of_allocations": 4,
"queue_capacity": 8192,
"cache_size": "0",
"priority": "normal",
"per_deployment_memory_bytes": 430914596,
"per_allocation_memory_bytes": 629366952
},
...
"assignment_state": "started",
"start_time": "2024-09-23T21:39:18.476066615Z",
"max_assigned_allocations": 4
}
}
The NER model has been deployed and started and is ready to be used.
The following ingest pipeline implements the NER model via the inference processor.
There is a significant amount of code here, but only two items of interest now exist. The rest of the code is conditional logic to drive some additional specific behavior that we will look closer at in the future.
-
The inference processor calls the NER model by ID, which we loaded previously, and passes the text to be analyzed, which, in this case, is the message field, which is the text_field we want to pass to the NER model to analyze for PII.
-
The script processor loops through the message field and uses the data generated by the NER model to replace the identified PII with redacted placeholders. This looks more complex than it really is, as it simply loops through the array of ML predictions and replaces them in the message string with constants, and stores the results in a new field
redact.message. We will look at this a little closer in the following steps.
The code can be found here for the following three sections of code.
The NER PII Pipeline
logs-ner-pii-processor pipeline code - click to open/close
# NER Pipeline
DELETE _ingest/pipeline/logs-ner-pii-processor
PUT _ingest/pipeline/logs-ner-pii-processor
{
"processors": [
{
"set": {
"description": "Set to true to actually redact, false will run processors but leave original",
"field": "redact.enable",
"value": true
}
},
{
"set": {
"description": "Set to true to keep ml results for debugging",
"field": "redact.ner.keep_result",
"value": true
}
},
{
"set": {
"description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
"field": "redact.ner.skip_entity",
"value": "NONE"
}
},
{
"set": {
"description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
"field": "redact.ner.minimum_score",
"value": 0
}
},
{
"set": {
"if": "ctx?.redact?.message == null",
"field": "redact.message",
"copy_from": "message"
}
},
{
"set": {
"field": "redact.ner.successful",
"value": true
}
},
{
"set": {
"field": "redact.ner.found",
"value": false
}
},
{
"inference": {
"model_id": "dslim__bert-base-ner",
"field_map": {
"message": "text_field"
},
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_NER_FAILED"
}
},
{
"set": {
"field": "redact.ner.successful",
"value": false
}
}
]
}
},
{
"script": {
"if": "ctx.failure_ner != 'REDACT_NER_FAILED'",
"lang": "painless",
"source": """String msg = ctx['message'];
for (item in ctx['ml']['inference']['entities']) {
if ((item['class_name'] != ctx.redact.ner.skip_entity) &&
(item['class_probability'] >= ctx.redact.ner.minimum_score)) {
msg = msg.replace(item['entity'], '<' +
'REDACTNER-'+ item['class_name'] + '_NER>')
}
}
ctx.redact.message = msg""",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_REPLACEMENT_SCRIPT_FAILED",
"override": false
}
},
{
"set": {
"field": "redact.successful",
"value": false
}
}
]
}
},
{
"set": {
"if": "ctx?.ml?.inference?.entities.size() > 0",
"field": "redact.ner.found",
"value": true,
"ignore_failure": true
}
},
{
"set": {
"if": "ctx?.redact?.pii?.found == null",
"field": "redact.pii.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.ner?.found == true",
"field": "redact.pii.found",
"value": true
}
},
{
"remove": {
"if": "ctx.redact.ner.keep_result != true",
"field": [
"ml"
],
"ignore_missing": true,
"ignore_failure": true
}
}
],
"on_failure": [
{
"set": {
"field": "failure",
"value": "GENERAL_FAILURE",
"override": false
}
}
]
}
The updated PII Processor Pipeline, which now calls the NER Pipeline
process-pii pipeline code - click to open/close
# Updated Process PII pipeline that now call the NER pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
}
]
}
Now reload the data as described here in Reloading the logs
Results
Let's take a look at the results with the NER processing in place. In the Logs Explorer with KQL query bar, execute the following query
Logs Explorer should look something like this, open the top message to see the details.
NER Model Results
Lets take a closer look at what these fields mean.
Field:
Sample Value:
Description: An array of the named entity classes that the NER model has identified.
Field:
Sample Value:
Description: The class_probability is a value between 0 and 1, which indicates how likely it is that a given data point belongs to a certain class. The higher the number, the higher the probability that the data point belongs to the named class. This is important as in the next blog we can decide a threshold that we will want to use to alert and redact on.' You can see in this example it identified a
Field:
Sample Value:
Description: The array of entities identified that align positionally with the
Field:
Sample Value:
Description: The predicted value of the model.
PII Assessment Dashboard
Lets take a quick look at a dashboard built to assess PII the data.
To load the dashboard, go to Kibana -> Stack Management -> Saved Objects and import the
https://github.com/bvader/elastic-pii/blob/main/elastic/blog-part-1/pii-dashboard-part-1.ndjson
More complete instructions on Kibana Saved Objects can be found here.
After loading the dashboard, navigate to it and select the right time range and you should see something like below. It shows metrics such as sample rate, percent of logs with NER, NER Score Trends etc. We will examine the assessment and actions in part 2 of this blog.
Summary and Next Steps
In this first part of the blog, we have accomplished the following.
- Reviewed the techniques and tools we have available for PII detection and assement
- Reviewed NLP / NER role in PII detection and assessment
- Built the necessary composable ingest pipelines to sample logs and run them through the NER Model
- Reviewed the NER results and are ready to move to the second blog
In the upcoming Part 2 of this blog of this blog, we will cover the following:
- Redact PII using NER and redact processor
- Apply field-level security to control access to the un-redacted data
- Enhance the dashboards and alerts
- Production considerations and scaling
- How to run these processes on incoming or historical data
Data Loading Appendix
Code
The data loading code can be found here:
https://github.com/bvader/elastic-pii
$ git clone https://github.com/bvader/elastic-pii.git
Creating and Loading the Sample Data Set
$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker
Run the log generator
$ python generate_random_logs.py
If you do not changes any parameters, this will create 10000 random logs in a file named pii.log with a mix of logs that containe and do not contain PII.
Edit
# The Elastic User
ELASTIC_USER = "elastic"
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSword = "askdjfhasldfkjhasdf"
# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="
Then run the following command.
$ python load_logs.py
Reloading the logs
Note To reload the logs, you can simply re-run the above command. You can run the command multiple time during this exercise and the logs will be reloaded (actually loaded again). The new logs will not collide with previous runs as there will be a unique
$ python load_logs.py