Introduction:
The prevalence of high-entropy logs in distributed systems has significantly raised the risk of PII (Personally Identifiable Information) seeping into our logs, which can result in security and compliance issues. This 2-part blog delves into the crucial task of identifying and managing this issue using the Elastic Stack. We will explore using NLP (Natural Language Processing) and Pattern matching to detect, assess, and, where feasible, redact PII from logs being ingested into Elasticsearch.
In Part 1 of this blog, we covered the following:
- Review the techniques and tools we have available to manage PII in our logs
- Understand the roles of NLP / NER in PII detection
- Build a composable processing pipeline to detect and assess PII
- Sample logs and run them through the NER Model
- Assess the results of the NER Model
In Part 2 of this blog, we will cover the following:
- Apply the redactregex pattern processor and assess the results
- Create Alerts using ESQL
- Apply field-level security to control access to the un-redacted data
- Production considerations and scaling
- How to run these processes on incoming or historical data
Reminder of the overall flow we will construct over the 2 blogs:
All code for this exercise can be found at: https://github.com/bvader/elastic-pii.
Part 1 Prerequisites
This blog picks up where Part 1 of this blog left off. You must have the NER model, ingest pipelines, and dashboard from Part 1 installed and working.
- Loaded and configured NER Model
- Installed all the composable ingest pipelines from Part 1 of the blog
- Installed dashboard
You can access the complete solution for Blog 1 here. Don't forget to load the dashboard, found here.
Applying the Redact Processor
Next, we will apply the
Elasticsearch comes packaged with a number of useful predefined patterns that can be conveniently referenced by the
In the code below, we leveraged some of the predefined patterns as well as constructing several custom patterns.
"patterns": [
"%{EMAILADDRESS:EMAIL_REGEX}", << Predefined
"%{IP:IP_ADDRESS_REGEX}", << Predefined
"%{CREDIT_CARD:CREDIT_CARD_REGEX}", << Custom
"%{SSN:SSN_REGEX}", << Custom
"%{PHONE:PHONE_REGEX}" << Custom
]
We also replaced the PII with easily identifiable patterns we can use for assessment.
In addition, it is important to note that since the redact processor is a simple regex find and replace, it can be used against many "secrets" patterns, not just PII. There are many references for regex and secrets patterns, so you can reuse this capability to detect secrets in your logs.
The code can be found here for the following two sections of code.
redact processor pipeline code - click to open/close
# Add the PII redact processor pipeline
DELETE _ingest/pipeline/logs-pii-redact-processor
PUT _ingest/pipeline/logs-pii-redact-processor
{
"processors": [
{
"set": {
"field": "redact.proc.successful",
"value": true
}
},
{
"set": {
"field": "redact.proc.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.message == null",
"field": "redact.message",
"copy_from": "message"
}
},
{
"redact": {
"field": "redact.message",
"prefix": "<REDACTPROC-",
"suffix": ">",
"patterns": [
"%{EMAILADDRESS:EMAIL_REGEX}",
"%{IP:IP_ADDRESS_REGEX}",
"%{CREDIT_CARD:CREDIT_CARD_REGEX}",
"%{SSN:SSN_REGEX}",
"%{PHONE:PHONE_REGEX}"
],
"pattern_definitions": {
"CREDIT_CARD": """\d{4}[ -]\d{4}[ -]\d{4}[ -]\d{4}""",
"SSN": """\d{3}-\d{2}-\d{4}""",
"PHONE": """(\+\d{1,2}\s?)?1?\-?\.?\s?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}"""
},
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_PROCESSOR_FAILED",
"override": false
}
},
{
"set": {
"field": "redact.proc.successful",
"value": false
}
}
]
}
},
{
"set": {
"if": "ctx?.redact?.message.contains('REDACTPROC')",
"field": "redact.proc.found",
"value": true
}
},
{
"set": {
"if": "ctx?.redact?.pii?.found == null",
"field": "redact.pii.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.proc?.found == true",
"field": "redact.pii.found",
"value": true
}
}
],
"on_failure": [
{
"set": {
"field": "failure",
"value": "GENERAL_FAILURE",
"override": false
}
}
]
}
And now, we will add the
redact processor pipeline code - click to open/close
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-pii-redact-processor"
}
}
]
}
Reload the data as described in the Reloading the logs. If you have not generated the logs the first time, follow the instructions in the Data Loading Appendix
Go to Discover and enter the following into the KQL bar
And if you did not load the dashboard from Blog Part 1 at already, load it, it can be found here using the Kibana -> Stack Management -> Saved Objects -> Import.
It should look something like this now. Note that the REGEX portions of the dashboard are now active.
Checkpoint
At this point, we have the following capabilities:
- Ability to sample incoming logs and apply this PII redaction
- Detect and Assess PII with the NER/NLP and Pattern Matching
- Assess the amount, type and quality of the PII detections
This is a great point to stop if you are just running all this once to see how it works, but we have a few more steps to make this useful in production systems.
- Clean up the working and unredacted data
- Update the Dashboard to work with the cleaned-up data
- Apply Role Based Access Control to protect the raw unredacted data
- Create Alerts
- Production and Scaling Considerations
- How to run these processes on incoming or historical data
Applying to Production Systems
Cleanup working data and update the dashboard
And now we will add the cleanup code to the overall
In short, we set a flag
NOTE: Of course you can change this behavior if you want to completely delete the unredacted data. In this exercise we will keep it and protect it.
In addition we set
These fields allow a lot of control over what data you decide to keep and analyze.
The code can be found here for the following two sections of code.
redact processor pipeline code - click to open/close
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline and cleans up
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-pii-redact-processor"
}
},
{
"set": {
"description": "Set to true to actually redact, false will run processors but leave original",
"field": "redact.enable",
"value": true
}
},
{
"rename": {
"if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
"field": "message",
"target_field": "raw.message"
}
},
{
"rename": {
"if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
"field": "redact.message",
"target_field": "message"
}
},
{
"set": {
"description": "Set to true to actually to clean up working data",
"field": "redact.cleanup",
"value": true
}
},
{
"remove": {
"if": "ctx?.redact?.cleanup == true",
"field": [
"ml"
],
"ignore_failure": true
}
}
]
}
Reload the data as described here in the Reloading the logs.
Go to Discover and enter the following into the KQL bar
You should see something like this
We have everything we need to move forward with protecting the PII and Alerting on it.
Load up the new dashboard that works on the cleaned-up data
To load the dashboard, go to Kibana -> Stack Management -> Saved Objects and import the
The new dashboard should look like this. Note: It uses different fields under the covers since we have cleaned up the underlying data.
You should see something like this
Apply Role Based Access Control to protect the raw unredacted data
Elasticsearch supports role-based access control, including field and document level access control natively; it dramatically reduces the operational and maintenance complexity required to secure our application.
We will create a Role that does not allow access to the
NOTE: Since we only sampled 10% of the data in this exercise the non-sampled
The code can be found here for the following section of code.
RBAC protect-pii role and user code - click to open/close
# Create role with no access to the raw.message field
GET _security/role/protect-pii
DELETE _security/role/protect-pii
PUT _security/role/protect-pii
{
"cluster": [],
"indices": [
{
"names": [
"logs-*"
],
"privileges": [
"read",
"view_index_metadata"
],
"field_security": {
"grant": [
"*"
],
"except": [
"raw.message"
]
},
"allow_restricted_indices": false
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": [
"all"
],
"resources": [
"*"
]
}
],
"run_as": [],
"metadata": {},
"transient_metadata": {
"enabled": true
}
}
# Create user stephen with protect-pii role
GET _security/user/stephen
DELETE /_security/user/stephen
POST /_security/user/stephen
{
"password" : "mypassword",
"roles" : [ "protect-pii" ],
"full_name" : "Stephen Brown"
}
Now log into a separate window with the new user
You should see something like this
Create an Alert when PII Detected
Now, with the processing of the pipelines, creating an alert when PII is detected is easy. To review Alerting in Kibana in detail if needed
NOTE: Reload the data if needed to have recent data.
First, we will create a simple ES|QL query in Discover.
FROM logs-pii-default
| WHERE redact.pii.found == true
| STATS pii_count = count(*)
| WHERE pii_count > 0
When you run this you should see something like this.
Now click the Alerts menu and select
Select a time field: @timestamp Set the time window: 5 minutes
Assuming you loaded the data recently when you run Test it should do something like
pii_count :
Add an action when the alert is Active.
For each alert:
Elasticsearch query rule {{rule.name}} is active:
- PII Found: true
- PII Count: {{#context.hits}} {{_source.pii_count}}{{/context.hits}}
- Conditions Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}
Add an Action for when the Alert is Recovered.
For each alert:
Elasticsearch query rule {{rule.name}} is Recovered:
- PII Found: false
- Conditions Not Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}
When all setup it should look like this and
You should get an Active alert that looks like this if you have recent data. I sent mine to Slack.
Elasticsearch query rule pii-found-esql is active:
- PII Found: true
- PII Count: 374
- Conditions Met: Query matched documents over 5m
- Timestamp: 2024-10-15T02:44:52.795Z
- Link: https://mydeployment123.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989
And then if you wait you will get a Recovered alert that looks like this.
Elasticsearch query rule pii-found-esql is Recovered:
- PII Found: false
- Conditions Not Met: Query did NOT match documents over 5m
- Timestamp: 2024-10-15T02:49:04.815Z
- Link: https://mydeployment123.kb.us-west-1.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989
Production Scaling
NER Scaling
As we mentioned Part 1 of this blog of this blog, NER / NLP Models are CPU-intensive and expensive to run at scale; thus, we employed a sampling technique to understand the risk in our logs without sending the full logs volume through the NER Model.
Please review the setup and configuration of the NER model from Part 1 of the blog.
We chose the base BERT NER model bert-base-NER for our PII case.
To scale ingest, we will focus on scaling the allocations for the deployed model. More information on this topic is available here. The number of allocations must be less than the available allocated processors (cores, not vCPUs) per node.
The metrics below are related to the model and configuration from Part 1 of the blog.
- 4 Allocations to allow for more parallel ingestion
- 1 Thread per Allocation
- 0 Byes Cache, as we expect a low cache hit rate Note If there are many repeated logs, cache can help, but with timestamps and other variations, cache will not help and can even slow down the process
- 8192 Queue
GET _ml/trained_models/dslim__bert-base-ner/_stats
.....
"node": {
"0m4tq7tMRC2H5p5eeZoQig": {
.....
"attributes": {
"xpack.installed": "true",
"region": "us-west-1",
"ml.allocated_processors": "5", << HERE
.....
},
"inference_count": 5040,
"average_inference_time_ms": 138.44285714285715, << HERE
"average_inference_time_ms_excluding_cache_hits": 138.44285714285715,
"inference_cache_hit_count": 0,
.....
"threads_per_allocation": 1,
"number_of_allocations": 4, <<< HERE
"peak_throughput_per_minute": 1550,
"throughput_last_minute": 1373,
"average_inference_time_ms_last_minute": 137.55280407865988,
"inference_cache_hit_count_last_minute": 0
}
]
}
}
There are 3 key pieces of information above:
-
"ml.allocated_processors": "5"The number of physical cores / processors available
-
"number_of_allocations": 4The number of allocations which is maximum 1 per physical core. Note: we could have used 5 allocations, but we only allocated 4 for this exercise
-
"average_inference_time_ms": 138.44285714285715The averages inference time per document.
The math is pretty straightforward for throughput for Inferences per Min (IPM) per allocation (1 allocation per physical core), since an inference uses a single core and a single thread.
Then the Inferences per Min per Allocation is simply:
When then lines up with the Total Inferences per Minute
Suppose we want to do 10,000 IPMs, how many allocations (cores) would I need?
Or perhaps logs are coming in at 5000 EPS and you want to do 1% Sampling.
Then
Want Faster! Turns out there is a more lightweight NER Model distilbert-NER model that is faster, but the tradeoff is a little less accuracy.
Running the logs through this model results in an inference time nearly twice as fast!
Here is some quick math:
Suppose we want to do 25,000 IPMs, how many allocations (cores) would I need?
Now you can apply this math to determine the correct sampling and NER scaling to support your logging use case.
Redact Processor Scaling
In short, the
Assessing incoming logs
If you want to test on incoming logs data in a data stream. All you need to do is change the conditional in the
Note: Just make sure that you have accounted for the proper scaling for the NER and Redact processors they were described above in Production Scaling
{
"pipeline": {
"description" : "Call the process_pii pipeline on the correct dataset",
"if": "ctx?.data_stream?.dataset == 'pii'", <<< HERE
"name": "process-pii"
}
}
So if for example your logs are coming into
"if": "ctx?.data_stream?.dataset == 'mycustomapp'",
Assessing historical data
If you have a historical (already ingested) data stream or index you can run the assessment over them using the
Note: Just make sure that you have accounted for the proper scaling for the NER and Redact processors they were described above in Production Scaling
There are a couple of extra steps: The code can be found here.
- First we can set the parameters to ONLY keep the sampled data as there is no reason to make a copy of all the unsampled data. In the process-piipipeline, there is a settingsample.keep_unsampled, which we can set tofalse, which will then only keep the sampled data
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": false <<< SET TO false
}
},
- Second, we will create a pipeline that will reroute the data to the correct data stream to run through all the PII assessment/detection pipelines. It also sets the correct datasetandnamespace
DELETE _ingest/pipeline/sendtopii
PUT _ingest/pipeline/sendtopii
{
"processors": [
{
"set": {
"field": "data_stream.dataset",
"value": "pii"
}
},
{
"set": {
"field": "data_stream.namespace",
"value": "default"
}
},
{
"reroute" :
{
"dataset" : "{{data_stream.dataset}}",
"namespace": "{{data_stream.namespace}}"
}
}
]
}
- Finally, we can run a _reindexto select the data we want to test/assess. It is recommended to review the _reindex documents before trying this. First, select the source data stream you want to assess, in this example, it is thelogs-generic-defaultlogs data stream. Note: I also added arangefilter to select a specific time range. There is a bit of a "trick" that we need to use since we are re-routing the data to the data streamlogs-pii-default. To do this, we just set"index": "logs-tmp-default"in the_reindexas the correct data stream will be set in the pipeline. We must do that becausererouteis anoopif it is called from/to the same datastream.
POST _reindex?wait_for_completion=false
{
"source": {
"index": "logs-generic-default",
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1h/h",
"lt": "now"
}
}
}
]
}
}
},
"dest": {
"op_type": "create",
"index": "logs-tmp-default",
"pipeline": "sendtopii"
}
}
Summary
At this point, you have the tools and processes need to assess, detect, analyze, alert and protect PII in your logs.
The end state solution can be found here:.
In Part 1 of this blog, we accomplished the following.
- Reviewed the techniques and tools we have available for PII detection and assessment
- Reviewed NLP / NER role in PII detection and assessment
- Built the necessary composable ingest pipelines to sample logs and run them through the NER Model
- Reviewed the NER results and are ready to move to the second blog
In Part 2 of this blog, we covered the following:
- Redact PII using NER and redact processor
- Apply field-level security to control access to the un-redacted data
- Enhance the dashboards and alerts
- Production considerations and scaling
- How to run these processes on incoming or historical data
So get to work and reduce risk in your logs!
Data Loading Appendix
Code
The data loading code can be found here:
https://github.com/bvader/elastic-pii
$ git clone https://github.com/bvader/elastic-pii.git
Creating and Loading the Sample Data Set
$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker
Run the log generator
$ python generate_random_logs.py
If you do not changes any parameters, this will create 10000 random logs in a file named pii.log with a mix of logs that containe and do not contain PII.
Edit
# The Elastic User
ELASTIC_USER = "elastic"
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"
# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="
Then run the following command.
$ python load_logs.py
Reloading the logs
Note To reload the logs, you can simply re-run the above command. You can run the command multiple time during this exercise and the logs will be reloaded (actually loaded again). The new logs will not collide with previous runs as there will be a unique
$ python load_logs.py