Getting a View on AWS Activity via CloudTrail and ElasticSearch and Kibana
For a long time I’ve been telling people “you can just analyse CloudTrail with ElasticSearch” or similar, but I’d never tried to do it myself. When I went to find resources online I found a ton of really old code, old blog posts, etc. None of it is up to date, though much of it mostly works.
The process is basically:
- Get CloudTrail turned on.
- Get CloudTrail events written to an S3 bucket.
- Get an ElasticSearch cluster running.
- Get a Lambda function that takes S3 objects (the CloudTrail records) and indexes them into ElasticSearch.
- Set up an S3 Event to call your Lambda function every time a CloudTrail object is delivered to your bucket.
- Wait for some events to flow in.
- Get into Kibana and set up some visualisations.
Tip of the Iceberg
If you’re doing this for security purposes, you’re really only getting started. The right way to secure the cloud is to analyse this data and look for bad things you want to respond to. Then you automatically respond. You don’t create a dashboard and have humans look at it with their eyeballs. Dashboards are for investigating problems, but not for detecting problems.
Once you’ve got CloudTrail reliably inserted into your ElasticSearch cluster, start looking at stuff like Traildash2 or AWS cloudwatch logs subscription consumer. Both are very old, but they contain some useful widgets for visualisations.
- Access to AWS Lambda
- CloudTrail already set to store logs in an s3 bucket. Don’t have that? Follow this documentation.
- An elasticsearch cluster. Either build it yourself, or follow the tutorial in the documentation.
Once you’ve got CloudTrail logging to S3 and your ElasticSearch cluster accepting input, you’re ready to do this work. This will set up an S3 event listener. When an object is put in S3, this code gets called. This code unzips the S3 object and squirts it into ElasticSearch.
Set Up IAM permissions
You need to create some policies and attach them to a role. That will give your Lambda function the ability to execute. The easiest way to do this is to create a single IAM policy that grants all the permissions you need, and then attach that one policy to the Lambda role. (You could have a few policies—one for elasticsearch, one for S3, one for CloudWatch Logs—and then attach 3 policies to the one role)
The IAM policy allows 3 things: Reading your S3 bucket to get cloudtrail, posting records to your ElasticSearch cluster, and CloudWatch Logs for writing any errors or logging.
- Edit the
- Add in the bucket name for your bucket.
- Add in the domain name you assigned to your ElasticSearch domain.
- Create an IAM policy, name it something like
CloudTrailESAccessand set its contents to be the
Create a Role For your Lambda Function
Create an AWS IAM role. 1. Choose Lambda as the service for the role. 2. Attach the policy you created.
Prep the Lambda Function
- Clone this repo
- On your terminal, install the requests module on the same folder with
pip install requests -t .
- Make a zip file that includes the python file and the requests library. From within the repo, run
zip -r lambda-function.zip cloudtrail2ES.py *
Create the Lambda Function
These are instructions for doing it by hand in the console. There’s also a cloudtrail2ES.yaml file that contains some CloudFormation.
- Create a new Lambda Function
- Choose Python 3.6 as a runtime.
- Choose to start with an empty function.
- Set the handler for the function is
- Set the environment variables:
ES_INDEXset to something like
cloudtrail. That will result in ElasticSearch indexes like
ES_HOSTset to the name of your ElasticSearch endpoint. You can get that from the console or by running
aws es describe-elasticsearch-domain. The value will look something like:
search-DOMAIN-abcdefgabcdefg.eu-west-1.es.amazonaws.com. You don’t want anything else (no ‘https’ or / or anything).
- Test the Lambda function.
- Edit the
test-cloudtrail-event.jsonfile and change the bucket name to be your CloudTrail bucket. Go find a real key (“file name”) of a CloudTrail object in your bucket and put that in the
- Invoke the test and set your test event to be this JSON object. Debug any problems.
- If the test was successful, you have a few objects in your ElasticSearch cluster now.
- If the test was unsuccessful, go into the CloudWatch Logs of your lambda function and look at the error messages.
- Edit the
Launch the Lambda from S3 Events
- Go to your S3 Bucket in the console.
- Click on Properties
- Click on Events
- Click + Add Notification
- Name the event
- For Events, select “All object create events”
- For Prefix, enter an appropriate prefix. Probably
- For Send to, select the lambda function you created
- Click Save.
Seeing it Work
It will take time. CloudTrail logs are delivered every few minutes. The S3 events fire pretty much immediately. Then the records are sent to ElasticSearch.
- Click on the Kibana link in the ElasticSearch console.
- Click on the Management gear icon
- Click on + Create Index Pattern
- Name the index pattern
cloudtrail-*(or whatever you used for the
ES_INDEXvalue). You should see something helpful like Your index pattern matches 2 indices
- Choose the
@timestampfield for your timestamp
- Save the index
- Click on the Discover link at the top left. You should see some CloudTrail records.
If you look at your function in the Lambda console, you’ll see a tab labeled Monitoring. There’s a link for its logs on cloudwatch, you can see what the lambda function is doing.
You will want to click on the log group and set its retention time to something. By default, CloudWatch Logs are set to Never Expire and that will store every log entry forever. I set mine to 3 days. That’s probably generous. I really don’t need these logs at all.
If, like me, you turned on CloudTrail a long time ago, and now you’re just starting to analyse it with ElasticSearch, you might want to import a lot of S3 objects from your CloudTrail bucket. There’s a script
loadCloudTrail2ES.py that will do that. You invoke it something like this:
python3 loadCloudTrail2ES.py \ --bucket MYCLOUDTRAILBUCKET \ --endpoint MYELASTICSEARCHDOMAIN \ --region eu-west-1 \ --prefix AWSLogs/111111111111/CloudTrail/
Note that it takes a few other optional arguments that you can use to test before you turn it loose:
--limit X will stop after processing X S3 objects.
--dryrun will cause it to fetch and parse the S3 objects, but it will not
POST them to ElasticSearch.
--index will name the index something different. By default it names the index
cloudtrail-YYYY-MM-DD. If you give it
--index foo on the command line, it will use
foo-YYYY-MM-DD as the index instead.
--profile will use the
STS::AssumeRole feature to assume the role for invoking AWS API calls. See the named profiles documentation for more information on how that works.
--prefix is optional. If you leave it out, it defaults to
AWSLogs/ with the assumption that you probably want to limit yourself to CloudTrail logs, and that’s the prefix where they’re written by default. If you want no prefix at all, so that the script parses every single object in the bucket, you need to specify
loadCloudTrail2ES.py script uses the bulk upload API of ElasticSearch. It does not stop to think about whether that would be a good idea. It batches up all the CloudTrail events in the S3 object and sends them to ElasticSearch in a single request. My S3 objects rarely have more than 100 CloudTrail events in them and this always succeeds for me. But if you have a really active account and you have hundreds or thousnds of events in a single S3 object, this might fail.
If you screw up, remember that
curl -X DELETE https://endpoint/cloudtrail-* is your friend. You can delete ElasticSearch indexes fairly easily.
I really know very little about ElasticSearch and Kibana. But once you get them going, you can see an awful lot of interesting stuff.