ETL from SQL Server to Azure Synapse Analytics with Slowly Changing Dimensions (SCD) Type 2 using Python

ETL from SQL Server to Azure Synapse Analytics with Slowly Changing Dimensions (SCD) Type 2 using Python

  1. Install the required libraries: You’ll need the pyodbc, pandas, and azure-synapse libraries. Install them using pip:
pip install pyodbc pandas azure-synapse

2. Connect to SQL Server and extract data:

import pyodbc
import pandas as pd

# Set up your SQL Server connection string
sql_conn_str = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"

# Connect to SQL Server
sql_conn = pyodbc.connect(sql_conn_str)

# Extract data from SQL Server
source_query = "SELECT * FROM your_source_table"
source_data = pd.read_sql(source_query, sql_conn)

# Close the connection
sql_conn.close()

3. Implement SCD Type 2 logic:

# Define the SCD Type 2 logic function
def scd2_merge(current_data, new_data, key_columns, scd_columns):
merged_data = pd.merge(
current_data,
new_data,
on=key_columns,
how="outer",
suffixes=("_current", "_new"),
indicator=True
)

new_records = merged_data[merged_data["_merge"] == "right_only"]
unchanged_records = merged_data[merged_data["_merge"] == "both"]
expired_records = merged_data[merged_data["_merge"] == "left_only"]

for scd_col in scd_columns:
changed_records = unchanged_records[unchanged_records[f"{scd_col}_current"] != unchanged_records[f"{scd_col}_new"]]
unchanged_records = unchanged_records[unchanged_records[f"{scd_col}_current"] == unchanged_records[f"{scd_col}_new"]]

if not changed_records.empty:
changed_records = changed_records.drop("_merge", axis=1)
changed_records["_merge"] = "right_only"
new_records = new_records.append(changed_records)

new_records = new_records.drop("_merge", axis=1)
new_records.columns = [col.replace("_new", "") for col in new_records.columns]

return new_records

4. Connect to Azure Synapse Analytics and load data:

from azure.synapse import SynapseWorkspace

# Set up your Azure Synapse Analytics connection string
synapse_conn_str = "DRIVER={Azure Synapse Analytics};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"

# Connect to Azure Synapse Analytics
synapse_conn = pyodbc.connect(synapse_conn_str)

# Load current data from Azure Synapse Analytics
destination_query = "SELECT * FROM your_destination_table"
destination_data = pd.read_sql(destination_query, synapse_conn)

# Apply SCD Type 2 logic and get the new records
key_columns = ["key_column1", "key_column2"]
scd_columns = ["scd_column1", "scd_column2"]
new_records = scd2_merge(destination_data, source_data, key_columns, scd_columns)

# Load new records into Azure Synapse Analytics
new_records.to_sql("your_destination_table", synapse_conn, if_exists="append", index=False)

# Close the connection
synapse_conn.close()

from azure.synapse import SynapseWorkspace

# Set up your Azure Synapse Analytics connection string
synapse_conn_str = "DRIVER={Azure Synapse Analytics};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"

# Connect to Azure Synapse Analytics
synapse_conn = pyodbc.connect(synapse_conn_str)

# Load current data from Azure Synapse Analytics
destination_query = "SELECT * FROM your_destination_table"
destination_data = pd.read_sql(destination_query, synapse_conn)

# Apply SCD Type 2 logic and get the new records
key_columns = ["key_column1", "key_column2"]
scd_columns = ["scd_column1", "scd_column2"]
new_records = scd2_merge(destination_data, source_data, key_columns, scd_columns)

# Load new records into Azure Synapse Analytics
new_records.to_sql("your_destination_table", synapse_conn, if_exists="append", index=False)

# Close the connection
synapse_conn.close()

Replace the placeholders (your_server, your_database, your_username, your_password, your_source_table, your_destination_table, key_column