BigSnarf blog

Infosec FTW

Cloudtrail Log Analyzer Python3 Aysnc Await

#! /usr/local/env python
# coding: utf-8
import gzip
import json
from pprint import pprint
import pandas as pd
from import json_normalize
import sys
import socket
import boto
import os
import ipaddress
import asyncio
import glob
hdd_files = glob.glob("/Users/bigsnarfdude/cloudtrail_logs/*.json.gz")
security_events = ['CreateKeyPair', 'CheckMfa']
class CloudtrailAnalysis():
def check_value(df_data, value):
if df_data[df_data['eventName'] == value].empty:
frame = df_data[df_data['eventName'] == value]
return value, frame['eventTime'].values[0], frame['userIdentity.userName'].values[0], frame['awsRegion'].values[0]
async def get_file_analyse_local_events(f, event):
#print("+++ Found new log: ", f)
with, "rb") as f:
d = json.loads("ascii"))
records = d["Records"]
df_data = json_normalize(records)
if CloudtrailAnalysis.check_value(df_data, event) == None:
print(CloudtrailAnalysis.check_value(df_data, event))
async def main(f, event):
await get_file_analyse_local_events(f, event)
# process async
loop = asyncio.get_event_loop()
for f in hdd_files:
for event in security_events:
loop.run_until_complete(main(f, event))

view raw
hosted with ❤ by GitHub

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: