Amazon CloudTrail parsing for specific API call using Spark

Recently I needed to parse Amazon Web Service CloudTrail log files to check for some specific API call.  The call was being deprecated by Amazon and we needed to upgrade our code to reflect the latest CLI (aws).  The logs I got from IT folks were json files and each file contained only one single line each with length anywhere from 0.5MB to 1MB! And this one line had array of json objects like below.

Problem:  Identify an application that is making some specific API calls to a service.   Not much other info was available and all we had was bunch of CloudTrail logs and AWS deprecated API call (DescribeJobFlows).
See - http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeJobFlows.html which basically suggested "DescribeJobFlows: This API is deprecated and will eventually be removed. We recommend you use..."

{"Records": [ {"eventVersion":"1.02","userIdentity":{"type":"IAMUser","principalId":"AIDAIVWSMF.........","arn":"arn:aws:iam::144..........:user/staging-user","accountId":"14    4702935796","accessKeyId":".............","userName":"............."},"eventTime":"2016-.....T13:50:16Z","eventSource":"datapipeline.amazonaws.com","eventName":"Report    TaskProgress","awsRegion":"us-east-1","sourceIPAddress":"54.2.......","userAgent":"aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23    .25-b01/1.6.0_33","requestParameters":{"taskId":"37kSdZbBpwRjSng--/1Tms+pYiDGRURfXkPWl90Azl893QqHry1pryPTkUzLed5u1O2KIK9qh8pM1fk8fPBHzGbHDWhk0VTWFru3wJnanhJSuYLkEZ0fSmC...."},"responseElements":{"canceled":false},"requestID":"65ac3d1a-476e-11e6-a25c-f339ebc3012a","eventID":"4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69","eventType":"AwsApiC    all","recipientAccountId":"144........"},....

It is actually array of JSON objects and for more info on the log see http://docs.aws.amazon.com/awscloudtrail/latest/userguide/view-cloudtrail-events-cli.html

Even before developing some code I wanted to test couple of ideas.  Following is one of the way I was able to identify the offending application and host(s).  After the successful evaluation, I wrote Python script to upload hundreds of files to S3's new bucket and performed minor transformation to clean the data so that data was suitable for loading to Spark.

For quick readable json you can do:

> vim 144........._CloudTrail_us-east-1_20160711T1355Z_n0m854FRPqpkaPd3.json
 # and then using python json module
(in vim) :%!python -m json.tool
to see
    1  {                                                                                                                                                                          
    2     "Records": [
    3         {
    4             "awsRegion": "us-east-1",
    5             "eventID": "4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69",
    6             "eventName": "ReportTaskProgress",
    7             "eventSource": "datapipeline.amazonaws.com",
    8             "eventTime": "2016-.....T13:50:16Z",
    9             "eventType": "AwsApiCall",
   10             "eventVersion": "1.02",
   11             "recipientAccountId": "144...",
   12             "requestID": "65ac3d1a-476e-11e6-a25c-f339ebc3012a",
   13             "requestParameters": {
   14                 "taskId": "37kSdZbBpwRjSng/1Tms+p....
   16             "responseElements": {
   17                 "canceled": false
   18             },
   19             "sourceIPAddress": "54........",
   20             "userAgent": "aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23.25-b01/1.6.0_33",
   21             "userIdentity": {
   22                 "accessKeyId": "AK...",
   23                 "accountId": "144...",
   24                 "arn": "arn:aws:iam::144....:user/staging-user",
   25                 "principalId": "...",
   26                 "type": "IAMUser",
   27                 "userName": "....-user"
   28             }
   29         },
   30         {
 
will spit out readable output.   Next is performing some cleanup and have one json object per line.

(in vim) :1,$s#\{\"Records\"\:\ \[##     -- vim removal of header
(in vim) :1,$s#\}\,#\}^V^M#g               -- vim substitute each record onto it's own line

I then loaded his file into Spark table and then run SparkSQL on it.   See screenshots below.


Note "DescribeJobFLows" calls above originating from an EMR instance on a specific IP.