In this blog post I will show you how to implement your own custom logging in Azure Data Factory
Before you do this please note that Azure has its own in built Log analytics where it stores these logs and gives you multiple options to distribute and share it.
https://docs.microsoft.com/en-us/azure/data-factory/monitor-using-azure-monitor
Scenario: We had a requirement to capture our ETL process in log files. In our process we were copying data from on prem data sources like SQL Server and Oracle Databases to Azure Data Lake Store Gen2. Then in Data lake store we had multiple stages from Raw folder layer, Refined folder layer and Produce layer, we would be applying various transformations from one layer to the next using Databricks notebook.
The requirement was to track the record counts from Source system all the way to the Produced layer (visualization layer). Since we wanted to have detail of our ETL in one file we decided to build our own custom logging framework
FRAMEWORK ARCHITECTURE
1. Use new copy Activity to read the output of the Actual Copy Activity then write the results to a Azure Data Lake Store file as csv
2. Pass the RunID details from the ADF job to a Databricks notebook and use that to create the dataframe of record counts from each layer.
3. Combine the two log files ( ADF log file and databricks log file) into one master file for the entire ETL process in Databricks
CAPTURING LOGS OF ADF COPY ACTIVITY USING A NEW COPY ACTIVITY(SOURCE AS SQL SERVER AND SINK AS ADLS)
Below is a sample copy activity output
SELECT '@{pipeline().DataFactory}' as DataFactory_Name,
'@{pipeline().Pipeline}' as Pipeline_Name,
'@{pipeline().RunId}' as RunId,
'Stature_SOR' as Source,
'RAW/16263_STATURE' as Destination,
'@{item().TABLENAME}' as Tablename,
'@{pipeline().TriggerType}' as TriggerType,
'@{pipeline().TriggerId}' as TriggerId,
'@{pipeline().TriggerName}' as TriggerName,
'@{pipeline().TriggerTime}' as TriggerTime,
'@{activity('Copy_OneTable').output.rowsCopied}' as rowsCopied,
'@{activity('Copy_OneTable').output.rowsRead}' as RowsRead,
'@{activity('Copy_OneTable').output.usedParallelCopies}' as No_ParallelCopies,
'@{activity('Copy_OneTable').output.copyDuration}' as copyDuration_in_secs,
'@{activity('Copy_OneTable').output.effectiveIntegrationRuntime}' as effectiveIntegrationRuntime,
'@{activity('Copy_OneTable').output.executionDetails[0].source.type}' as Source_Type,
'@{activity('Copy_OneTable').output.executionDetails[0].sink.type}' as Sink_Type,
'@{activity('Copy_OneTable').output.executionDetails[0].status}' as Execution_Status,
'@{activity('Copy_OneTable').output.executionDetails[0].start}' as CopyActivity_Start_Time,
'@{utcnow()}' as CopyActivity_End_Time,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.queuingDuration}' as CopyActivity_queuingDuration_in_secs,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.timeToFirstByte}' as CopyActivity_timeToFirstByte_in_secs,
'@{activity('Copy_OneTable').output.executionDetails[0].detailedDurations.transferDuration}' as CopyActivity_transferDuration_in_secs
After writing this details to a csv file in ADLS. We created a notebook that passes in the runid as the input parameter to continue our custom logging