Showing posts with label ETL. Show all posts
Showing posts with label ETL. Show all posts

Invalid data dump - Amazon Redshift, Data Pipeline and S3

Amazon Data Pipeline (DPL) is late entrant to the ETL market but provides many features that are well integrated to AWS cloud.  In any data extraction process one would encounter invalid or incorrect data and that data may either be logged or ignored depending on the business requirements or severity of rejected data.

When you have your data flow through S3 to other platforms, be it, Redshift, RDS, DynamoDB, etc. in AWS you can use S3 to dump that data.   In an example, similar to one DPL below, in one of the step you could filter and dump to S3 for later analysis.



By standardizing the rejections from different DPLs, another DPL can regularly load them back into Redshift for quick realtime analysis or deep dive into them downstream.  This will also greatly help in recovery and reruns when needed.

Following is simple high level steps where rejected data is directed to S3.  The parameters are provided through the environment setup.  For example: #{myDPL_schema_name} = 'prod_public' and #{myDPL_error_log_path} = 's3://emr_cluster/ad/clicks/...'


 

-- PreProcess 
-- Load staging stable and at the same time update data error column in it when possible.
INSERT INTO #{myDPL_schema_name}.#{myDPL_staging_table}
SELECT col1,
  col2,
  col3,
  etc
  CASE WHEN 
  AS data_error
FROM #{myDPL_schema_name}.#{myDPL_source_table}
LEFT OUTER JOIN #{myDPL_schema_name}.table_1
  ON ...
LEFT OUTER JOIN #{myDPL_schema_name}.dim_1
  ON ... 
LEFT OUTER JOIN #{myDPL_schema_name}.dim_N
  ON ...
WHERE ...


-- OR, If data_error column is updated separately...
UPDATE #{myDPL_schema_name}.{myDPL_staging_table}
SET data_error = ...
FROM #{myDPL_schema_name}.{myDPL_staging_table}
JOIN #{myDPL_schema_name}.dim_1
JOIN #{myDPL_schema_name}.dim_N
WHERE ...

-- Temporary table
CREATE TEMP TABLE this_subject_dpl_rejections AS (
SELECT
  ...
FROM #{myDPL_schema_name}.#{myDPL_staging_table}
WHERE data_error IS NOT NULL
);

-- Dump to S3
UNLOAD ('SELECT * FROM this_subject_dpl_rejections')
TO '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
  mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/
  hh=#{format(@scheduledStartTime,'HH')}/ran_on_#{@actualStartTime}_file_'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
GZIP
PARALLEL OFF
ALLOWOVERWRITE;
Now load the errors back to Redshift...
COPY #{myDPL_schema_name}.#{myDPL_error_table}
FROM  '#{myDPL_ErrorLogPath}/yyyy=#{format(@scheduledStartTime,'YYYY')}/
  mm=#{format(@scheduledStartTime,'MM')}/dd=#{format(@scheduledStartTime,'dd')}/ 
  hh=#{format(@scheduledStartTime,'HH')}/'
CREDENTIALS 'aws_access_key_id=#{myDPL_AWSAccessKey};aws_secret_access_key=#{myDPL_AWSSecretKey}'
DELIMITER  '|' 
GZIP 
TRIMBLANKS
TRUNCATECLUNS 
IGNOREBLANKLINES

DW solution - IP mapping to Geo Locations

In my previous few posts on the blog, I covered how to convert IP addresses to corresponding integers and back. When performing a Data Warehouse (DW) tables join between IP addresses for location analysis joining on integer IP column is much faster than joining on string column.  As noted in other posts, this article covers IPv4 but technique could be used for IPv6 addresses on using 2 BIGINT.

For geo location info, you can use MaxMind's Geo-City data that is provided in two entities - one with mapping IP block to a location ID and another one with location ID to city/state/country.  It is one to many (1:M) relationship and to find an IP address location one need to find the matching IP block and then map it to city or country.  There are posts that describe in detail how to perform it effeciently for a given IP.  These techniques will work for an OLTP environment where you pass known IP address from UI or application layer to database and gets its city/country.
  1. Fast MySQL Range Queries on MaxMind GeoIP Tables byAndy Skelton
  2. MaxMind GeoIP Install Setup Tutorial by Bartomedia
  3. Finding a user's country/region based on IP by Shlomi Noach
  4. On efficiently geo-referencing IPs with MaxMind GeoIP and MySQL GIS by Jeremy Cole
Geo-City data has following fields seperated by comma and row terminated by new line (\n):
  • Location ID
  • Country Code
  • State/Region
  • City Name
  • Postal Code (US only)
  • Latitude
  • Longitude
  • Metro Code
  • Area Code
IP Block has:
  • Start IP Address
  • End IP Address
  • Location ID
While loading this dimensional data during ETL process, you should also do the IP address transformation to integer as explained in "Converting an IP address to an integer".  And Geo-City dimensional table will also have partition range (the reason for this post) and surrogate key (id).  You should also perform other data validation like non-overlapping ranges, valid country codes, etc. which are all typical standard activities of DW ETL process.  The time, resource and effort put into cleaning and validating the data before pushing the data into final fact tables will pay well within short period of time.  To fix a data issue and reconcile the reports in DW/BI environment always leads to lower ROI.

You can also denormalize the above tables and create a single table.  In either case by joining on range of IP address for a given set of IPs will result in the same slow performance.

dim_geo_ip_city_location holds around 5.9million rows with following attributes:

CREATE TABLE dim_geo_ip_city_location (
    location_id int NOT NULL,
    start_ip varchar(16) NOT NULL,
    end_ip  varchar(16) NOT NULL,
    country char(2),
    region varchar(64), 
    city varchar(128),
    postal_code varchar(32),
    latitude decimal(8,4),
    longitude decimal(8,4),
    metro_code varchar(8),
    area_code varchar(8),
    ...
)ENIGNE=...

CREATE TABLE dim_ip_block (
    start_ip_num bigint NOT NULL,
    end_ip_num bigint NOT NULL,
    location_id int NOT NULL,
        ...
)ENGINE=...

In MySQL (5.0) optimizer performs range operation effeciently with a constant meaning that you have given the query an IP address like "WHERE ip_addr between 1230 and 1234", for example, but not with range operations without constants.  For more info on using range optimization see mysql documentation and for indexing see "Mastering the Art of (MySQL) Indexing".

I did not find any article providing some insights into doing DW/BI analysis on IP addresses.  In a DW environment the business user is interested in slicing and dicing the hundreds of thousands to million of IPs.  Just trying to find few thousand IP address locations (See query 1 below) was taking more than an hour when joined between lookup table (a table similar to dim_geo_ip_city_location but without partition column (ip_range_partition) and id) and ip address join table even with appropriate indexes like index on (start_ip_num, end_ip_num) or start_ip_num and end_ip_num.

Query 1:
-- Simple query to select IP block
SELECT IA.ip_num, B.location_id, B.start_ip_num, B.end_ip_num
FROM ip_addresses IA use index(...)
JOIN dim_ip_block B use index(...)
WHERE IA.ip_num between B.start_ip_num and B.end_ip_num

This leads to MySQL performing full scans. And in the case of ip_block table (B) it doesn't use any index. For this to be avoided, I partitioned the dimension (lookup) table into 1000 parts and since this table has ~5.9 Million rows each partition will result in ~5,900 rows in each partition.  For more on data set partitions see its wiki.  The partitions should be numbered in the ascending order with respect to ascending range of ip blocks (start_ip_num).  Then also create a another table (dim_ip_range_partition) with start_ip_num and end_ip_num for this each range basically creating super IP blocks.

CREATE TABLE dim_ip_range_partition (
     partition_id int NOT NULL AUTO_INCREMENT,
     start_ip_num bigint NOT NULL,
     end_ip_num bigint NOT NULL,
     ....
) ENGINE=InnoDB ;

Also, create a primary key on patition_id and index on (start_ip_num, end_ip_num) on this table.
The solution is based on two things that there is no overlapping IP block and IP blocks are ordered (in ascending order).

To populate the above table,
  1. Add id (row_id) column to dim_ip_block
  2. Add partition_id integer column to dim_ip_block
  3. Update id from 1 to n (highest start_ip_num) in ascending order of start_ip_num
  4. Partition "n" ids into P parts (n/P) where n > 1 and 1 < P < n
  5. Assign each part/partition ascending integer value
  6. Now update above table dim_ip_range_partition P_TABLE from dim_ip_block B_TABLE where P_TABLE partition id between 1 and P of B_TABLE and P_TABLE.start_ip_num is lowest B_TABLE.start_ip_num of the corresponding partition and P_TABLE.end_ip_num is highest of corresponding partition.  In other words...
         INSERT INTO dim_ip_range_partition
            (partition_id, start_ip_num, end_ip_num, ...)
         SELECT partition_id, min(start_ip_num), max(end_ip_num), ...
         FROM dim_ip_block
         GROUP BY partition_id
         ORDER BY partition_id asc

By doing so, we have created a super IP blocks each having n/P blocks from the original ip_blocks.   And the following query is one way to extract city/country info for given set of IP addresses.

SELECT t.ip_num, t.ip, B.start_ip_num, B.start_ip, B.end_ip_num, B.end_ip
       ,L.city, L.country
FROM 
(
   --- Get the super ip block (partition) 
   SELECT C.ip_num, max(partition_id) partition_id, max(P.start_ip_num) start_ip_num
   FROM ip_addresses C use index(...)
   JOIN dim_ip_range_partition P 
   WHERE C.ip_num >= P.start_ip_num
   GROUP BY C.ip_num
) t
LEFT OUTER JOIN mktg_ip_block B use index(...)  
  ON  t.partition_id = B.partition_id     --- And join on this partition
  AND t.ip_num >= B.start_ip_num 
  AND t.ip_num <= B.end_ip_num
JOIN dim_geo_ip_city_location L          --- Get city/country
  ON B.location_id = L.location_id 

The query that would have taken more than hour returns within 30 seconds now!  You can further optimize the above design with say more denormalized relations, having a large number of partitions (this is a trade-off, with highest partition n = P, each partition is same as original table which will result in worse off performance!) on both lookup table and lookedup table.  Or extending the partitioning concept to more levels (super super block, ....) you can reduce the number of joining rows with little bit more complex query.

The solution is a typical Computer Science way of handling the complex problem by dividing into smaller ones and solving it.  In this it is more specific to MySQL solution for large data sets with range operations.

Cheers,
Shiva

Converting an IP address to an integer and reverse

In dealing with IP address analysis one may required to map IPs to geo location. There are third party applications or software that perform IP to city, state, country mapping and businesses, for example, can make use of them to see where their web traffic is coming from. 

Article covers IPv4 addresses which use 32 bits and similar technique can be used to IPv6 addresses which use 128 bits by using 2 BIGINTs to store (64bits each).

It is easy to handle reasonable load when one is querying for few IPs or few hundred IPs either from simple user interface entering one or more IP addresses or through APIs. In these cases IP addresses can simply be matched in string format without much of performance issue but it can become a performance bottleneck when dealing with millions of rows loaded through ETL process into Data Warehouse environment and reports are run against fact tables with hundreds of millions of rows. Under these circumstances converting IP to an integer will boost the performance substantially.

MySQL provides functions to convert IP string to integer and backwards. For example, inet_aton converts string to a number (integer) and inet_ntoa converts number to integer.

inet_aton and inet_ntoa:

mysql> select inet_aton('192.168.100.200');
+------------------------------+
| inet_aton('192.168.100.200') |
+------------------------------+
|             3232261320 |
+------------------------------+

mysql> select inet_ntoa(3232261320);
+-----------------------+
| inet_ntoa(3232261320) |
+-----------------------+
|     192.168.100.200 |
+-----------------------+
1 row in set (0.00 sec)

Email parsing with regular expressions

Emails always pose interesting challenges to load and handle for doing any kind of analysis with them - analysing number of users from email providers like Yahoo, Google, Hotmail, etc. or finding top country specific domains or users with specific email pattern.

RFC 822, 2822 (April, 2001) and 5322 (October,2008) define the specification and syntax for text messages exchanged between computers.  The RFCs cover not just the email address specifications but also the envelop and contents.  Most of applications adhere to subset of these specs with more stringent email addresses.  For example, though address specs allow for ALPHA (a-z, A-Z), DIGITS (0-9) and special characters like "!", "#", "$", "%", etc. and even more special characters with quoted strings.  But typically email service providers won't allow you to create an email address with special characters like "$" or "!", etc.  For more spec details see section 3.4 and 3.4.1 in rfc5322RFC 3696 explains in much easier terms email addresses, URI and HTTP URL.

One can load email addresses in to staging area of data warehouse (DW) and then perform validation and cleaning through ETL's in-built utilities or components.  But to handle many of different possibilities and ability to quickly update or modify any new format encountered, it is better to validate and clean it through application programs before loading into staging or final tables.  In this way the load can also be distributed through file split and multi-threading or processes running on different systems.

Following regular expression pulled from regular-expressions.info handles all most all email addresses (more than 99.9%) and you can tweak to it suit your needs for performance or handling specific emails only. For example, you may not want to allow any email that has special characters like #, %, $, {, }, /, *, etc.  I have explained below in detail how this RE parses an email.

[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+(?:[A-Z]{2}|com|org|net|edu|gov|mil|biz|info|mobi|name|aero|asia|jobs|museum)\b

Same as above with color coding for further explanation:
[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+(?:[A-Z]{2}|com|org|net|edu|gov|mil|biz|info|mobi|name|aero|asia|jobs|museum)\b

Address mainly has two parts - one before (part 1) "@" and one after it (part 2).

Part 1:
  1.  [a-z0-9!#$%&'*+/=?^_`{|}~-]  -- A set of characters i.e., between square brackets [ ], and in this case all letters "a to z" (either upper case or lower case) and numerals "0 to 9" and special characters "!#$%&'*+/=?^_`{|}~-" are allowed
  2. --  One or more times of the above character set. Meaning "joe" or "!j!" or "abc" are all valid i.e.,  Alphabets and ! are repeated one or more times. But Part1.1 will not match "(abc)" or "joe<>" because "(", ")", "<", ">" are not in the above character set.
  3.  (  -- Allow grouping also start remembering the matched string.  To avoid storing use "?:" right after "(".
  4.  ?: -- Don't need to remember or store what is matched.  When grouping is done through "(", it is also remembered in variables $1, $2, $3, etc. With "?:" following "(" indicates that not to store it. This will help with the performance but if you want the matched string to use for further processing, then you can remove "?:" from above regular expression.
  5. \. --  It indicates "." can appear in email after bullet 1 above but "." has significance for the regular expression (RE) itself. In RE "." means any single character so we need to escape it and pass it "." down to parsing engine.
  6. [a-z0-9!#$%&'*+/=?^_`{|}~-]  -- See 1 above.
  7.  +  -- See 2 above.  Also, if a "." appears then there must be at least one of characters in 6.
  8. *  -- Means zero or more times. In this case all characters in step 5, 6 & 7 can be optional. That is "joe.a" is valid but not "joe."
  9.  @ -- "@" sign is a must in the email address.  It should appear once.
Part 2:

     Part 2A: (?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+
  1. + -- All with the outer parenthesis must appear at least once.  It enforcing rule that after "@" sign in email, there must be at least one charcter.
  2. ?: --  Don't need to remember or store what is matched.  Helps in performance when dealing with millions of email parsing.
  3.  [a-z0-9] -- First character after the "@" must be a alpha-numeral (a,b,c...z or 1,2,...9)
  4.  (?:[a-z0-9-]*[a-z0-9])-- "?" at the end indicates 0 or 1 time of alpha-numeral characters.  That is, after a first character in step 3 next characters must be alpha-numerals only but they are optional.
  5. \. -- See step 5 in part 1.
     

     Part 2B:  (?:[A-Z]{2}|com|org|net|edu|gov|mil|biz|info|mobi|name|aero|asia|jobs|museum)
  1.   [A-Z]{2} -- Any two characters after the final ".". This covers country top level domains (TLD) like "uk" (for United Kingdom), "in" (India), "ca"(Canada), etc.  See this list of standard country codes.
  2.  | -- Is a "OR" operator.  It matches "com" or "org" or "museum", etc.
And final "\b" at the end is a word boundary anchor. 

During this validation and clean up you can also add length check on the email part 1 and 2.  RFCs specify of length 64 characters (octet) for part 1 (before @ character) and 255 characters (octet) after @ symbol.  These implementations can effectively handle more than 99.999% of emails.

Let me know if you have different regular expression or different way of parsing emails.

HTH,
Shiva

Data Profiling - An example in Talend Profiler

Data is loaded into Data Warehouse (DW) from disparate systems and sometimes from external partners who have their own coding standards.  In any case, the quality of the data loaded into the data warehouse is often variable,and especially while discovering the data one may need to load some sample data and do some analysis including initial data profiling.  During this process one may discover differences which when resolved result in much smoother data flow along the process flow.   Or it may be at later stage, say after the summarization is completed one may need to do some analysis on type of data.  In all these cases data profiling helps and Talend provides a tool (Talend Open Profiler - TOP) to quickly and efficiently perform profiling.

Data profiling - the process of examining available data in different data sources, including databases, applications, files, data transfer from external systems etc., and collecting statistics and information - improves data quality and better reporting.

In date dimension, we have nearly 220,000 rows covering Jan.01,1900 to Dec.31,2500 (7 hundred year dates) and one of the column is 'day_of_week_name' (cardinality 7 - MONDAY, TUESDAY....).  This table has 70 columns including date, weeks, months, names, etc. For testing purpose, I wanted to check the nulls and pattern frequency (distribution) for 'day_of_week_name' column.

To do so, select the column to profile (day_of_week_name), drag and drop into "Analyzed columns" of "Analysis Settings" tab.  Then pick the indicators i.e., how you want the column measured (count, range, stats, etc.) and I picked row count and NULL count along with "Pattern Frequency Table".  Pattern frequency will count different patterns. The results in "Analysis Results" tab shows as below.

Logging queries for performance monitoring

Before making any performance improvements, one need to measure it either you are trying to improve computer applications or DW queries, etc.  In one of the Data Warehouse subject area where I run more than 120K (120,00) queries everyday in an ETL environment, I log all queries to a table and track over time to see which queries are showing performance issue(s). With the following table, I was able to improve the performance by more than 50% some time.  For a query that repeatedly runs with different parameters in where clause, a small improvement adds up quickly and other times due to data skewing you can clearly see any changes needed to improve performance.   Other times any mistakes in indexes (DBA dropped it :) or application user modified their query, etc. you will have a reference point to check against why a particular query or queries are slow.


Query Performance Monitoring table definition
Column Null? Type Comment
ID No int (11) Auto Increment
RUN_NUMBER No tinyint Useful when job is recovered or re-ran
APPLICATION No varchar(128) Application name to track
DOMAIN Yes varchar(128) Application domain or subject area
QUERY No varchar(4096) Actual query
DURATION No decimal(10,5) How long did query run?
ROW_COUNT Yes int Number of rows affected
RESULT No varchar(32) Query result type - succeeded or failed?
COMMENT Yes varchar(128) User comment that can be logged with each query
CREATED_DT No date Query run date
CREATED_DT_TM No timestamp/ datetime Query run date and time
CREATED_BY Yes varchar(64) Query user name


Talend - Oracle - Java Path error

Few weeks ago on of our QA systems I installed Oracle 10g Express for testing and installation was smooth and all went well.  But recently, I needed to do some data profiling in a specific table in MySQL DW environment due to fact that some of queries were taking more than twice the amount of normal runs.  Specifically, one query that used to take 5-6 minutes churning through couple of million rows in a staging table started taking more than 15 minutes.  I suspected some data issue but needed to quickly run some tests knowing how the recent data profiles against earlier weeks data. 

I downloaded Talend Data Profiler to this new test machine (where I had Oracle 10g) and tried to install it.   Soon after double clicking on exe file, I got an error indicating something like "org.talend.rcp.branding.top.product could not be found." in configuration\12725000123.log.  The log also has some more information regarding which Java version it is trying to use and command line options passed to run it.  The error itself doesn't indicate much and this wasted quite a bit of time.

Going through manual of Talend I found out that profiler needs Java version 1.5 or later version.  But I was pretty sure that I had the latest Java environment on this box and checked twice by running java -version in cygwin and checking the path.  Also by visiting http://www.java.com then clicking on "Verify Installation" button.  They both indicated latest version.

Data loading through Talend ETL Studio

In couple of my earlier posts I talked about loading data pulled from search engines and then loading into staging tables of a data warehouse.  You can load data in couple of ways including databases bulk load tools - for example, MySQL "LOAD DATA INFILE " statement or using Talend's ETL studio or similar tools (Pentaho, Informatica, etc.)

In this article, I will be loading a simple data file with more than 200,000 (200K) rows of keyword performance data for illustration purpose and the setup used was
  MySQL 5.1 (Community version)
  Talend Open Studio (3.2.2)
  Perl 5.10.1


Other posts that may be of interest:
1. URL parsing in Python, Perl, MySQL - an example
2. Google Adwords Data Retriever
3. Google Analytics Data Retriever


The data flows through 6 components (tFileInputDelimited, tPerlRow, tSortRow, tFileOutputDelimited, tFilterRow and tMysqlOutput) starting at input data file read and transformed into another output file and then loaded into database.  Component #3 (sorting component) is not needed but shown here to stress the ease with which a developer can quickly put together a ETL process.  Palette tab on the right hand side houses these components and many more.  If you use Java instead of Perl, you will likely have more components to use and for list of all available components take at Talend Forge. 

Input record format is a tab seperated fields with one record per line and so is output file in component 4. 
# Typical fields in the file are date, account_id, campaign_id, ad_group_id, keyword_id, CPC, CTR, Cost, Impressions, url etc.
# 2009-12-12   9876543210   34987650   1147382910   1234567890  0.07  
# 0.226632  247.98   15444  http://hiregion.com/2009/10/google-analytics-data-retriever-api-sem_25.html

I provide the following regular expression to parse each URL to extract domain name in component 2.
     /(https?://)?(www\.)?([a-zA-Z0-9_%]*)\b\.[a-z]{2,4}(\.[a-z]{2})?/ig

And this parses any URL with correct formats like http://www.amazon.com or https://amazon.com or http://www.amazon.co.uk or amazon.com and more.

Component 4 sorts the input stream coming from the regexp output and dumps into a output file which is then read into a filter which filters out any record that doesn't have the primary key before loading into staging database.   You can load into database without having the primary key constraint to further speed it up and cleanup the duplicates before pushing into either dimensions or facts tables.

The whole run took little over a minute (~63 seconds) running on a test system running all three (MySQL, Talend, Perl).  For daily load, you can make use of scheduler to kick-off the job and automate the loading process.






 Hope that helps,
Shiva