Showing posts with label Cloud. Show all posts
Showing posts with label Cloud. Show all posts

Invalid data dump - Amazon Redshift, Data Pipeline and S3

Amazon Data Pipeline (DPL) is late entrant to the ETL market but provides many features that are well integrated to AWS cloud.  In any data extraction process one would encounter invalid or incorrect data and that data may either be logged or ignored depending on the business requirements or severity of rejected data.

When you have your data flow through S3 to other platforms, be it, Redshift, RDS, DynamoDB, etc. in AWS you can use S3 to dump that data.   In an example, similar to one DPL below, in one of the step you could filter and dump to S3 for later analysis.



By standardizing the rejections from different DPLs, another DPL can regularly load them back into Redshift for quick realtime analysis or deep dive into them downstream.  This will also greatly help in recovery and reruns when needed.

Following is simple high level steps where rejected data is directed to S3.  The parameters are provided through the environment setup.  For example: #{myDPL_schema_name} = 'prod_public' and #{myDPL_error_log_path} = 's3://emr_cluster/ad/clicks/...'


 

-- PreProcess 
-- Load staging stable and at the same time update data error column in it when possible.
INSERT INTO #{myDPL_schema_name}.#{myDPL_staging_table}
SELECT col1,
  col2,
  col3,
  etc
  CASE WHEN 
  AS data_error
FROM #{myDPL_schema_name}.#{myDPL_source_table}
LEFT OUTER JOIN #{myDPL_schema_name}.table_1
  ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_1
  ON ... 
LEFT OUTER JOIN #{myDPL_schema_name}.dim_N
  ON ...
WHERE ...


-- OR, If data_error column is updated separately...
UPDATE #{myDPL_schema_name}.{myDPL_staging_table}
SET data_error = ...
FROM #{myDPL_schema_name}.{myDPL_staging_table}
JOIN #{myDPL_schema_name}.dim_1
JOIN #{myDPL_schema_name}.dim_N
WHERE ...

-- Temporary table
CREATE TEMP TABLE this_subject_dpl_rejections AS (
SELECT
  ...
FROM #{myDPL_schema_name}.#{myDPL_staging_table}
WHERE data_error IS NOT NULL
);

-- Dump to S3
UNLOAD ('SELECT * FROM this_subject_dpl_rejections')
TO '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
  mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
  hh=#{format(@scheduledStartTime,'HH')}/ran_on_#{@actualStartTime}_file_'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
GZIP
PARALLEL OFF
ALLOWOVERWRITE;
Now load the errors back to Redshift...
COPY #{myDPL_schema_name}.#{myDPL_error_table}
FROM  '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
  mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/ 
  hh=#{format(@scheduledStartTime,'HH')}/'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
DELIMITER  '|' 
GZIP 
TRIMBLANKS
TRUNCATECLUNS 
IGNOREBLANKLINES

Amazon CloudTrail parsing for specific API call using Spark

Recently I needed to parse Amazon Web Service CloudTrail log files to check for some specific API call.  The call was being deprecated by Amazon and we needed to upgrade our code to reflect the latest CLI (aws).  The logs I got from IT folks were json files and each file contained only one single line each with length anywhere from 0.5MB to 1MB! And this one line had array of json objects like below.

Problem:  Identify an application that is making some specific API calls to a service.   Not much other info was available and all we had was bunch of CloudTrail logs and AWS deprecated API call (DescribeJobFlows).
See - http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeJobFlows.html which basically suggested "DescribeJobFlows: This API is deprecated and will eventually be removed. We recommend you use..."

{"Records": [ {"eventVersion":"1.02","userIdentity":{"type":"IAMUser","principalId":"AIDAIVWSMF.........","arn":"arn:aws:iam::144..........:user/staging-user","accountId":"14    4702935796","accessKeyId":".............","userName":"............."},"eventTime":"2016-.....T13:50:16Z","eventSource":"datapipeline.amazonaws.com","eventName":"Report    TaskProgress","awsRegion":"us-east-1","sourceIPAddress":"54.2.......","userAgent":"aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23    .25-b01/1.6.0_33","requestParameters":{"taskId":"37kSdZbBpwRjSng--/1Tms+pYiDGRURfXkPWl90Azl893QqHry1pryPTkUzLed5u1O2KIK9qh8pM1fk8fPBHzGbHDWhk0VTWFru3wJnanhJSuYLkEZ0fSmC...."},"responseElements":{"canceled":false},"requestID":"65ac3d1a-476e-11e6-a25c-f339ebc3012a","eventID":"4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69","eventType":"AwsApiC    all","recipientAccountId":"144........"},....

It is actually array of JSON objects and for more info on the log see http://docs.aws.amazon.com/awscloudtrail/latest/userguide/view-cloudtrail-events-cli.html

Even before developing some code I wanted to test couple of ideas.  Following is one of the way I was able to identify the offending application and host(s).  After the successful evaluation, I wrote Python script to upload hundreds of files to S3's new bucket and performed minor transformation to clean the data so that data was suitable for loading to Spark.

For quick readable json you can do:

> vim 144........._CloudTrail_us-east-1_20160711T1355Z_n0m854FRPqpkaPd3.json
 # and then using python json module
(in vim) :%!python -m json.tool
to see
    1  {                                                                                                                                                                          
    2     "Records": [
    3         {
    4             "awsRegion": "us-east-1",
    5             "eventID": "4b4ebf63-0fd3-4f1e-81c0-0b399dd07a69",
    6             "eventName": "ReportTaskProgress",
    7             "eventSource": "datapipeline.amazonaws.com",
    8             "eventTime": "2016-.....T13:50:16Z",
    9             "eventType": "AwsApiCall",
   10             "eventVersion": "1.02",
   11             "recipientAccountId": "144...",
   12             "requestID": "65ac3d1a-476e-11e6-a25c-f339ebc3012a",
   13             "requestParameters": {
   14                 "taskId": "37kSdZbBpwRjSng/1Tms+p....
   16             "responseElements": {
   17                 "canceled": false
   18             },
   19             "sourceIPAddress": "54........",
   20             "userAgent": "aws-sdk-java/unknown-version Linux/3.4.73-64.112.amzn1.x86_64 OpenJDK_64-Bit_Server_VM/23.25-b01/1.6.0_33",
   21             "userIdentity": {
   22                 "accessKeyId": "AK...",
   23                 "accountId": "144...",
   24                 "arn": "arn:aws:iam::144....:user/staging-user",
   25                 "principalId": "...",
   26                 "type": "IAMUser",
   27                 "userName": "....-user"
   28             }
   29         },
   30         {
 
will spit out readable output.   Next is performing some cleanup and have one json object per line.

(in vim) :1,$s#\{\"Records\"\:\ \[##     -- vim removal of header
(in vim) :1,$s#\}\,#\}^V^M#g               -- vim substitute each record onto it's own line

I then loaded his file into Spark table and then run SparkSQL on it.   See screenshots below.


Note "DescribeJobFLows" calls above originating from an EMR instance on a specific IP.