Invalid data dump - Amazon Redshift, Data Pipeline and S3

Amazon Data Pipeline (DPL) is late entrant to the ETL market but provides many features that are well integrated to AWS cloud.  In any data extraction process one would encounter invalid or incorrect data and that data may either be logged or ignored depending on the business requirements or severity of rejected data.

When you have your data flow through S3 to other platforms, be it, Redshift, RDS, DynamoDB, etc. in AWS you can use S3 to dump that data.   In an example, similar to one DPL below, in one of the step you could filter and dump to S3 for later analysis.



By standardizing the rejections from different DPLs, another DPL can regularly load them back into Redshift for quick realtime analysis or deep dive into them downstream.  This will also greatly help in recovery and reruns when needed.

Following is simple high level steps where rejected data is directed to S3.  The parameters are provided through the environment setup.  For example: #{myDPL_schema_name} = 'prod_public' and #{myDPL_error_log_path} = 's3://emr_cluster/ad/clicks/...'


 

-- PreProcess 
-- Load staging stable and at the same time update data error column in it when possible.
INSERT INTO #{myDPL_schema_name}.#{myDPL_staging_table}
SELECT col1,
  col2,
  col3,
  etc
  CASE WHEN 
  AS data_error
FROM #{myDPL_schema_name}.#{myDPL_source_table}
LEFT OUTER JOIN #{myDPL_schema_name}.table_1
  ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_1
  ON ... 
LEFT OUTER JOIN #{myDPL_schema_name}.dim_N
  ON ...
WHERE ...


-- OR, If data_error column is updated separately...
UPDATE #{myDPL_schema_name}.{myDPL_staging_table}
SET data_error = ...
FROM #{myDPL_schema_name}.{myDPL_staging_table}
JOIN #{myDPL_schema_name}.dim_1
JOIN #{myDPL_schema_name}.dim_N
WHERE ...

-- Temporary table
CREATE TEMP TABLE this_subject_dpl_rejections AS (
SELECT
  ...
FROM #{myDPL_schema_name}.#{myDPL_staging_table}
WHERE data_error IS NOT NULL
);

-- Dump to S3
UNLOAD ('SELECT * FROM this_subject_dpl_rejections')
TO '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
  mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
  hh=#{format(@scheduledStartTime,'HH')}/ran_on_#{@actualStartTime}_file_'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
GZIP
PARALLEL OFF
ALLOWOVERWRITE;
Now load the errors back to Redshift...
COPY #{myDPL_schema_name}.#{myDPL_error_table}
FROM  '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
  mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/ 
  hh=#{format(@scheduledStartTime,'HH')}/'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
DELIMITER  '|' 
GZIP 
TRIMBLANKS
TRUNCATECLUNS 
IGNOREBLANKLINES

Amazon CloudTrail parsing for specific API call using Spark

Recently I needed to parse Amazon Web Service CloudTrail log files to check for some specific API call.  The call was being deprecated by Amazon and we needed to upgrade our code to reflect the latest CLI (aws).  The logs I got from IT folks were json files and each file contained only one single line each with length anywhere from 0.5MB to 1MB! And this one line had array of json objects like below.

Problem:  Identify an application that is making some specific API calls to a service.   Not much other info was available and all we had was bunch of CloudTrail logs and AWS deprecated API call (DescribeJobFlows).
See - http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeJobFlows.html which basically suggested "DescribeJobFlows: This API is deprecated and will eventually be removed. We recommend you use..."

{"Records": [ {"eventVersion":"1.02","userIdentity":{"type":"IAMUser","principalId":"AIDAIVWSMF.........","arn":"arn:aws:iam::144..........:user/staging-user","accountId":"14    4702935796","accessKeyId":".............","userName":"............."},"eventTime":"2016-.....T13:50:16Z","eventSource":"datapipeline.amazonaws.com","eventName":"Report    TaskProgress","awsRegion":"us-east-1","sourceIPAddress":"54.2.......","userAgent":"aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23    .25-b01/1.6.0_33","requestParameters":{"taskId":"37kSdZbBpwRjSng--/1Tms+pYiDGRURfXkPWl90Azl893QqHry1pryPTkUzLed5u1O2KIK9qh8pM1fk8fPBHzGbHDWhk0VTWFru3wJnanhJSuYLkEZ0fSmC...."},"responseElements":{"canceled":false},"requestID":"65ac3d1a-476e-11e6-a25c-f339ebc3012a","eventID":"4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69","eventType":"AwsApiC    all","recipientAccountId":"144........"},....

It is actually array of JSON objects and for more info on the log see http://docs.aws.amazon.com/awscloudtrail/latest/userguide/view-cloudtrail-events-cli.html

Even before developing some code I wanted to test couple of ideas.  Following is one of the way I was able to identify the offending application and host(s).  After the successful evaluation, I wrote Python script to upload hundreds of files to S3's new bucket and performed minor transformation to clean the data so that data was suitable for loading to Spark.

For quick readable json you can do:

> vim 144........._CloudTrail_us-east-1_20160711T1355Z_n0m854FRPqpkaPd3.json
 # and then using python json module
(in vim) :%!python -m json.tool
to see
    1  {                                                                                                                                                                          
    2     "Records": [
    3         {
    4             "awsRegion": "us-east-1",
    5             "eventID": "4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69",
    6             "eventName": "ReportTaskProgress",
    7             "eventSource": "datapipeline.amazonaws.com",
    8             "eventTime": "2016-.....T13:50:16Z",
    9             "eventType": "AwsApiCall",
   10             "eventVersion": "1.02",
   11             "recipientAccountId": "144...",
   12             "requestID": "65ac3d1a-476e-11e6-a25c-f339ebc3012a",
   13             "requestParameters": {
   14                 "taskId": "37kSdZbBpwRjSng/1Tms+p....
   16             "responseElements": {
   17                 "canceled": false
   18             },
   19             "sourceIPAddress": "54........",
   20             "userAgent": "aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23.25-b01/1.6.0_33",
   21             "userIdentity": {
   22                 "accessKeyId": "AK...",
   23                 "accountId": "144...",
   24                 "arn": "arn:aws:iam::144....:user/staging-user",
   25                 "principalId": "...",
   26                 "type": "IAMUser",
   27                 "userName": "....-user"
   28             }
   29         },
   30         {
 
will spit out readable output.   Next is performing some cleanup and have one json object per line.

(in vim) :1,$s#\{\"Records\"\:\ \[##     -- vim removal of header
(in vim) :1,$s#\}\,#\}^V^M#g               -- vim substitute each record onto it's own line

I then loaded his file into Spark table and then run SparkSQL on it.   See screenshots below.


Note "DescribeJobFLows" calls above originating from an EMR instance on a specific IP.

Simple puzzle and memory usage

Recently there was a simple puzzle in one of my whatsapp group.  It was
if 1 => 8, 2 =>  18,
   3 =>  30, 4 =>  44, 5 =>  60, then what is 9?

It was simple to calculate by hand and provide the answer.  You can also formulate the above deduction and use it for any number with

6 + (M + (2 x n))
    where n is set of integers; 0 < n < N
    and M is cumulative sum from previous step with M(initial value) is 0

This problem lends itself well to reduction function and with the given formula it was easy to check it for any integer n.  Out of curiosity created a function and was called in Python for-loop for each iteration while monitoring load on CPU and memory tracked. Then I used reduce function with xrange to see the same.

# Python

>>> a = []; M = 0

>>> for n in range(0, 100000000):
...        M = 6+M+(2*n)
...        a.append(M)

# [ CPU was loaded above 97% with memory usage jumping and ended up killing the process ]



>>> print reduce(lambda x,y: 6+x+(2*y), xrange(0,100000000))

10000000499999994

# with xrange, got the result in less than few seconds though CPU usage jumped above 97% but no discernible change in memory.   

Visualizing data processed consciously in a moment

Recently Google released a training material to its employees as part of how unconscious bias can happen in work place.  For more details see -
http://www.businessinsider.com/google-unconscious-bias-training-presentation-2015-12  which refers to research paper at http://rd.springer.com/chapter/10.1007%2F978-3-642-82598-9_3

It is very interesting and one of the slide (see below) specifically caught my attention with respect how huge amount of data is NOT processed consciously.   That is we can only handle 40 bits for every 11 million bits in a second!

That is just 1 part for every 275,000 parts!!



As interesting as it is the impact I thought will be even more dramatic when visualized it in some way.  Following are of couple of attempts at it using Python and ImageMagick.

Each tiny square (green or red) in the following picture represents 40 bits and each row has 275 of those squares. With 1000 rows (10 blocks of 100 rows each) of those we get 11 Million bits of data represented.  This is just for one second of duration!





Just imagine the the scale for one's life time (80 years) data. All those 275,000 blocks above are to be repeated 2,522,880,000 times!!!


Another way is tiny string, say 5 pixels long, represents 40 bits then total length of 11M bits will be 275k * 5 pixels long!  See below.  Small green line at the center of the diagram is a string of few pixels and around which the really long string i.e., 275K times long string is wound around a square peg. At each of 25 rounds the string changes color for better visibility. Totally there are 742 sides.

at end of 25th round the length is 5,050 times longer and
at end of 50th round it is 20,100 times
at end of 100th round it is 80,200 times
at end of 150th round it is 180,300 times
and finally at 185.25 round it is 274,111 times longer

Note: Keep in mind it is not the size (area) of the squares but the total perimeter length of all these squares that represents ratio of data.