Sunday, November 3, 2019

How to capture Azure Data Factory Copy Activity Log details


In this blog post I will show you how to implement your own custom logging in Azure Data Factory

Before you do this please note that Azure has its own in built Log analytics where it stores these logs and gives you multiple options to distribute and share it.

https://docs.microsoft.com/en-us/azure/data-factory/monitor-using-azure-monitor


Scenario: We had a requirement to capture our ETL process in log files. In our process we were copying data from on prem data sources like SQL Server and Oracle Databases to Azure Data Lake Store Gen2. Then in Data lake store we had multiple stages from Raw folder layer, Refined folder layer and Produce layer, we would be applying various transformations from one layer to the next using Databricks notebook.

The requirement was to track the record counts from Source system all the way to the Produced layer (visualization layer). Since we wanted to have detail of our ETL in one file we decided to build our own custom logging framework


FRAMEWORK ARCHITECTURE


1. Use new copy Activity to read the output of the Actual Copy Activity then write the results to a Azure Data Lake Store file as csv
2. Pass the RunID details from the ADF job to a Databricks notebook and use that to create the dataframe of record counts from each layer.
3. Combine the two log files ( ADF log file and databricks log file) into one master file for the entire ETL process in Databricks



CAPTURING LOGS OF ADF COPY ACTIVITY USING A NEW COPY ACTIVITY(SOURCE AS SQL SERVER AND SINK AS ADLS)

Below is a sample copy activity output


















The query used to extract the copy logs is below. Using a SQL Server Linked service the query below ran to pull both system variable and copy activity output variable. Notice the different ways to access the nested json values


SELECT '@{pipeline().DataFactory}' as DataFactory_Name,
'@{pipeline().Pipeline}' as Pipeline_Name,
'@{pipeline().RunId}' as RunId,
'Stature_SOR' as Source,
'RAW/16263_STATURE' as Destination,
'@{item().TABLENAME}' as Tablename,
'@{pipeline().TriggerType}' as TriggerType,
'@{pipeline().TriggerId}' as TriggerId,
'@{pipeline().TriggerName}' as TriggerName,
'@{pipeline().TriggerTime}' as TriggerTime,
'@{activity('Copy_OneTable').output.rowsCopied}' as rowsCopied,
'@{activity('Copy_OneTable').output.rowsRead}' as RowsRead,
'@{activity('Copy_OneTable').output.usedParallelCopies}' as No_ParallelCopies,
'@{activity('Copy_OneTable').output.copyDuration}' as copyDuration_in_secs,
'@{activity('Copy_OneTable').output.effectiveIntegrationRuntime}' as effectiveIntegrationRuntime,
'@{activity('Copy_OneTable').output.executionDetails[0].source.type}' as Source_Type,
'@{activity('Copy_OneTable').output.executionDetails[0].sink.type}' as Sink_Type,
'@{activity('Copy_OneTable').output.executionDetails[0].status}' as Execution_Status,
'@{activity('Copy_OneTable').output.executionDetails[0].start}' as CopyActivity_Start_Time,
'@{utcnow()}' as CopyActivity_End_Time,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.queuingDuration}' as CopyActivity_queuingDuration_in_secs,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.timeToFirstByte}' as CopyActivity_timeToFirstByte_in_secs,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.transferDuration}' as CopyActivity_transferDuration_in_secs



After writing this details to a csv file in ADLS. We created a notebook that passes in the runid as the input parameter to continue our custom logging

Tuesday, January 8, 2019

Performing SCD type 2 in Hive without using Merge statement



Introduction



Are you scratching your head trying to do a Merge statement in Hive? You know this is not the optimal way to use Hive yet you want to get the work done. Today is your lucky day.


I work for a company that unfortunately picked the wrong tool for routine analytics. Currently we use Hadoop Hive for our day to day Business Intelligence analytics requirements. We actually don't have more than 2 million rows of data across all our projects. Talk about under utilization.


One of the most recent request was to implement a type 2 slowly changing dimension (SCD) for a project that needed a historical trail. For a brief review of SCD can be found in Wikipedia here




Procedure


In this specific project, every day I extract records from an Oracle OLTP (online) database and store it into a Staging table. Then this Staging data is "Merged" into a Target table. By "Merge" I mean inserting new records and updating the "Valid to date" of the existing record


In most relational database we have an Update clause that can be used effectively to implement SCD 2. This helps us compare and combine two tables normally a stage and target table for example when matched






In this implementation of SCD type 2, we are creating 4 metadata columns to help us keep track of the records




  1. UDC_Valid_FROM_DT : This field is used to store the date the ETL “last saw” the record from the source system. It denotes that the version of the record for that day the ETL was run. This date will help tell the story of the various version of a specific record (per business key). By default every new extraction from source will have a value of system date
  2. UDC_Valid_TO_DT: This field is used to denote the “end” life span/ expiration date of a record (per business key). This will be used to show when a specific version of a record ceased to be valid or exist in the source system. By default every new extracted record from source will have a value ‘9999-12-31’, denoting a future date or infinity since that is the latest version of the record according to the source system
  3. UDC_IsCurrent: This field is another way to represent the Valid_to_DT of ‘9999-12-31’ but it’s main purpose is to easily display which version of a record(per business key) is the latest according to the source system. By default, every new extract each day will have a value of ”1”. “0” will mean that the record is no longer the most up to date version of that record.
  4. UDC_IsActive: This field is used to denote records(per business key) that are still existing in the source. It is not used to denote the most current version but rather to show that the business key could still be found in the source system since we know the source deletes records. By default, this field will be “1” for each extracted record. It will be “0” whenever the ETL notices that the business key does not exist in the source system


To implement this I created a staging table where I truncate and load the source extracted data into on a daily basis.


For the Target table we normally perform a Merge statement after the very first historical load.
The MERGE STATEMENT in any relational database to implement a SCD II is a routine and powerful SQL statement. However, in Hive we are limited to basic SQL statements.


In this blog, I will show you how I implemented this Merge statement in HIVE


Since my version of Hive does not even have an UPDATE functionality, I use the "CREATE table as" statement to recreate a new version of the table


NOTE: In this SCD II implementation we are only keeping the history when there is a change in certain key column (key_col)  we care about or want to track from the source system.
pk_col means primary key column




select
t.col_1,t.key_col,t.pk_col,
t.udc_Valid_from_DT, case when t.udc_valid_to_DT ='9999-12-31' and s.pk_col is  null and s1.pk_col is not null
THEN Date_add(from_unixtime(unix_timestamp()),0)
ELSE t.udc_valid_to_DT END AS udc_valid_to_DT
,case when t.udc_valid_to_DT ='9999-12-31' and s.pk_col is  null and s1.pk_col is not null THEN 0
ELSE t.udc_iscurrent END as udc_iscurrent,
case when s1.pk_col is null then 0 Else 1 end udc_isactive
from target_table T
left join  Source_stage_table S on T.pk_col = S.pk_col and T.key_col =s.key_col
left join  Source_stage_table S1 on T.pk_col = S1.pk_col
union all
select
s.col_1
,s.key_col
,s.pk_col,
,TO_CHAR(SYSDATE, 'YYYY-MM-DD') udc_valid_from_dt,
'9999-12-31' udc_valid_TO_dt,
1 UDC_Iscurrent, 1 UDC_IsActive
from Source_stage_table s
where not exists (select 1 from target_table t where T.pk_col  = S.pk_col  and T.key_col =s.key_col
)



 







BECOME A BIG DATA ENGINEER IN 3 MONTHS with less than $100 investment

INTRO: Below is my guide to becoming a data engineer based on the current job market (08/08/2020) demands. I have outlined the TOP 5 foun...