ETL from SQL Server to Azure Synapse Analytics with Slowly Changing Dimensions (SCD) Type 2 using Python
- Install the required libraries: You’ll need the
pyodbc
,pandas
, andazure-synapse
libraries. Install them using pip:
pip install pyodbc pandas azure-synapse
2. Connect to SQL Server and extract data:
import pyodbc
import pandas as pd
# Set up your SQL Server connection string
sql_conn_str = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"
# Connect to SQL Server
sql_conn = pyodbc.connect(sql_conn_str)
# Extract data from SQL Server
source_query = "SELECT * FROM your_source_table"
source_data = pd.read_sql(source_query, sql_conn)
# Close the connection
sql_conn.close()
3. Implement SCD Type 2 logic:
# Define the SCD Type 2 logic function
def scd2_merge(current_data, new_data, key_columns, scd_columns):
merged_data = pd.merge(
current_data,
new_data,
on=key_columns,
how="outer",
suffixes=("_current", "_new"),
indicator=True
)
new_records = merged_data[merged_data["_merge"] == "right_only"]
unchanged_records = merged_data[merged_data["_merge"] == "both"]
expired_records = merged_data[merged_data["_merge"] == "left_only"]
for scd_col in scd_columns:
changed_records = unchanged_records[unchanged_records[f"{scd_col}_current"] != unchanged_records[f"{scd_col}_new"]]
unchanged_records = unchanged_records[unchanged_records[f"{scd_col}_current"] == unchanged_records[f"{scd_col}_new"]]
if not changed_records.empty:
changed_records = changed_records.drop("_merge", axis=1)
changed_records["_merge"] = "right_only"
new_records = new_records.append(changed_records)
new_records = new_records.drop("_merge", axis=1)
new_records.columns = [col.replace("_new", "") for col in new_records.columns]
return new_records
4. Connect to Azure Synapse Analytics and load data:
from azure.synapse import SynapseWorkspace
# Set up your Azure Synapse Analytics connection string
synapse_conn_str = "DRIVER={Azure Synapse Analytics};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"
# Connect to Azure Synapse Analytics
synapse_conn = pyodbc.connect(synapse_conn_str)
# Load current data from Azure Synapse Analytics
destination_query = "SELECT * FROM your_destination_table"
destination_data = pd.read_sql(destination_query, synapse_conn)
# Apply SCD Type 2 logic and get the new records
key_columns = ["key_column1", "key_column2"]
scd_columns = ["scd_column1", "scd_column2"]
new_records = scd2_merge(destination_data, source_data, key_columns, scd_columns)
# Load new records into Azure Synapse Analytics
new_records.to_sql("your_destination_table", synapse_conn, if_exists="append", index=False)
# Close the connection
synapse_conn.close()
from azure.synapse import SynapseWorkspace
# Set up your Azure Synapse Analytics connection string
synapse_conn_str = "DRIVER={Azure Synapse Analytics};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"
# Connect to Azure Synapse Analytics
synapse_conn = pyodbc.connect(synapse_conn_str)
# Load current data from Azure Synapse Analytics
destination_query = "SELECT * FROM your_destination_table"
destination_data = pd.read_sql(destination_query, synapse_conn)
# Apply SCD Type 2 logic and get the new records
key_columns = ["key_column1", "key_column2"]
scd_columns = ["scd_column1", "scd_column2"]
new_records = scd2_merge(destination_data, source_data, key_columns, scd_columns)
# Load new records into Azure Synapse Analytics
new_records.to_sql("your_destination_table", synapse_conn, if_exists="append", index=False)
# Close the connection
synapse_conn.close()
Replace the placeholders (your_server
, your_database
, your_username
, your_password
, your_source_table
, your_destination_table
, key_column