Sunday, November 3, 2019

How to capture Azure Data Factory Copy Activity Log details


In this blog post I will show you how to implement your own custom logging in Azure Data Factory

Before you do this please note that Azure has its own in built Log analytics where it stores these logs and gives you multiple options to distribute and share it.

https://docs.microsoft.com/en-us/azure/data-factory/monitor-using-azure-monitor


Scenario: We had a requirement to capture our ETL process in log files. In our process we were copying data from on prem data sources like SQL Server and Oracle Databases to Azure Data Lake Store Gen2. Then in Data lake store we had multiple stages from Raw folder layer, Refined folder layer and Produce layer, we would be applying various transformations from one layer to the next using Databricks notebook.

The requirement was to track the record counts from Source system all the way to the Produced layer (visualization layer). Since we wanted to have detail of our ETL in one file we decided to build our own custom logging framework


FRAMEWORK ARCHITECTURE


1. Use new copy Activity to read the output of the Actual Copy Activity then write the results to a Azure Data Lake Store file as csv
2. Pass the RunID details from the ADF job to a Databricks notebook and use that to create the dataframe of record counts from each layer.
3. Combine the two log files ( ADF log file and databricks log file) into one master file for the entire ETL process in Databricks



CAPTURING LOGS OF ADF COPY ACTIVITY USING A NEW COPY ACTIVITY(SOURCE AS SQL SERVER AND SINK AS ADLS)

Below is a sample copy activity output


















The query used to extract the copy logs is below. Using a SQL Server Linked service the query below ran to pull both system variable and copy activity output variable. Notice the different ways to access the nested json values


SELECT '@{pipeline().DataFactory}' as DataFactory_Name,
'@{pipeline().Pipeline}' as Pipeline_Name,
'@{pipeline().RunId}' as RunId,
'Stature_SOR' as Source,
'RAW/16263_STATURE' as Destination,
'@{item().TABLENAME}' as Tablename,
'@{pipeline().TriggerType}' as TriggerType,
'@{pipeline().TriggerId}' as TriggerId,
'@{pipeline().TriggerName}' as TriggerName,
'@{pipeline().TriggerTime}' as TriggerTime,
'@{activity('Copy_OneTable').output.rowsCopied}' as rowsCopied,
'@{activity('Copy_OneTable').output.rowsRead}' as RowsRead,
'@{activity('Copy_OneTable').output.usedParallelCopies}' as No_ParallelCopies,
'@{activity('Copy_OneTable').output.copyDuration}' as copyDuration_in_secs,
'@{activity('Copy_OneTable').output.effectiveIntegrationRuntime}' as effectiveIntegrationRuntime,
'@{activity('Copy_OneTable').output.executionDetails[0].source.type}' as Source_Type,
'@{activity('Copy_OneTable').output.executionDetails[0].sink.type}' as Sink_Type,
'@{activity('Copy_OneTable').output.executionDetails[0].status}' as Execution_Status,
'@{activity('Copy_OneTable').output.executionDetails[0].start}' as CopyActivity_Start_Time,
'@{utcnow()}' as CopyActivity_End_Time,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.queuingDuration}' as CopyActivity_queuingDuration_in_secs,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.timeToFirstByte}' as CopyActivity_timeToFirstByte_in_secs,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.transferDuration}' as CopyActivity_transferDuration_in_secs



After writing this details to a csv file in ADLS. We created a notebook that passes in the runid as the input parameter to continue our custom logging

BECOME A BIG DATA ENGINEER IN 3 MONTHS with less than $100 investment

INTRO: Below is my guide to becoming a data engineer based on the current job market (08/08/2020) demands. I have outlined the TOP 5 foun...