Full Dataset Preprocessing Pipeline for the Example Dataset#

This section outlines a complete preprocessing pipeline for the example toy dataset. It combines multiple preprocessing steps for customer, merchant, and transaction data, and then constructs the graph inputs required for training a Graph Neural Network (GNN) based XGBoost model.

For simplicity, the computation of feature masks is not included in this example.

import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import category_encoders as ce
from scipy.linalg import block_diag


DATA_OUTPUT_DIR = "example_graph_data"

# Customers dataset
df_customers = pd.DataFrame(
    {
        "customer_id": ["C001", "C002", "C003"],
        "age": [25, 40, 35],
        "gender": ["Male", "Female", "Other"],
        "loyalty_points": [150, 300, 225],
    }
)


# Merchants dataset
df_merchants = pd.DataFrame(
    {
        "merchant_id": ["M001", "M002", "M003"],
        "merchant_rating": ["Poor", "Average", "Excellent"],
        "merchant_category": ["Grocery", "Electronics", "Apparel"],
        "merchant_location": ["New York", "90001", "Chicago"],  # City or ZIP code
    }
)


# Transactions dataset
df_transactions = pd.DataFrame(
    {
        "transaction_id": ["T001", "T002", "T003", "T004", "T005"],
        "customer_id": ["C002", "C003", "C001", "C001", "C003"],
        "merchant_id": ["M002", "M002", "M002", "M003", "M001"],
        "amount": [250.0, 175.5, 320.75, 85.99, 1000.0],
        "transaction_timestamp": [
            "2025-03-10 14:23:00",
            "2025-08-10 09:45:00",
            "2025-11-08 16:30:00",
            "2025-12-01 06:30:00",
            "2025-02-11 02:30:00",
        ],
        "transaction_type": ["online", "in-store", "online", "online", "in-store"],
        "is_fraud": [1, 0, 1, 0, 1],
    }
)


# ------------------------------------------------
# Step 2.1: Preprocess Customers
# ------------------------------------------------

preprocessed_customer_columns = []
preprocessed_merchant_columns = []
preprocessed_transaction_columns = []


# Binary encode customer_id (fit encoder and save for reuse)
encoder_customer_id = ce.BinaryEncoder(cols=["customer_id"]).fit(
    df_customers["customer_id"]
)
transformed_column_names = encoder_customer_id.get_params()["mapping"][0][
    "mapping"
].columns
df_customers[transformed_column_names] = encoder_customer_id.transform(
    df_customers["customer_id"]
)
preprocessed_customer_columns.extend(transformed_column_names)


# Binary encode gender
encoder_gender = ce.BinaryEncoder(cols=["gender"]).fit(df_customers["gender"])
transformed_column_names = encoder_gender.get_params()["mapping"][0]["mapping"].columns
df_customers[transformed_column_names] = encoder_gender.transform(
    df_customers["gender"]
)
preprocessed_customer_columns.extend(transformed_column_names)


# Scale numerical features: age and loyalty_points
scaler_customers = StandardScaler()
df_customers[["age", "loyalty_points"]] = scaler_customers.fit_transform(
    df_customers[["age", "loyalty_points"]]
)
preprocessed_customer_columns.extend(["age", "loyalty_points"])


print("\nPreprocessed Customers Data:")
print(df_customers)


# ------------------------------------------------
# Step 2.2: Preprocess Merchants
# ------------------------------------------------

# Binary encode merchant_id (fit encoder and save for reuse)
encoder_merchant_id = ce.BinaryEncoder(cols=["merchant_id"]).fit(
    df_merchants["merchant_id"]
)
transformed_column_names = encoder_merchant_id.get_params()["mapping"][0][
    "mapping"
].columns
df_merchants[transformed_column_names] = encoder_merchant_id.transform(
    df_merchants["merchant_id"]
)
preprocessed_merchant_columns.extend(transformed_column_names)


# Ordinal encode merchant_rating
rating_mapping = {"Poor": 1, "Average": 2, "Good": 3, "Very Good": 4, "Excellent": 5}
df_merchants["merchant_rating_numeric"] = df_merchants["merchant_rating"].map(
    rating_mapping
)
preprocessed_merchant_columns.extend(["merchant_rating_numeric"])


# Binary encode merchant_category
encoder_merchant_cat = ce.BinaryEncoder(cols=["merchant_category"]).fit(
    df_merchants["merchant_category"]
)
transformed_column_names = encoder_merchant_cat.get_params()["mapping"][0][
    "mapping"
].columns
df_merchants[transformed_column_names] = encoder_merchant_cat.transform(
    df_merchants["merchant_category"]
)
preprocessed_merchant_columns.extend(transformed_column_names)


# Binary encode merchant_location
encoder_merchant_location = ce.BinaryEncoder(cols=["merchant_location"]).fit(
    df_merchants["merchant_location"]
)
transformed_column_names = encoder_merchant_location.get_params()["mapping"][0][
    "mapping"
].columns
df_merchants[transformed_column_names] = encoder_merchant_location.transform(
    df_merchants["merchant_location"]
)
preprocessed_merchant_columns.extend(transformed_column_names)


print("\nPreprocessed Merchants Data:")
print(df_merchants)


# ------------------------------------------------
# Step 2.3: Preprocess Transactions
# ------------------------------------------------

# Process transaction_timestamp: convert to datetime and extract features
df_transactions["transaction_timestamp"] = pd.to_datetime(
    df_transactions["transaction_timestamp"]
)


df_transactions["hour_sin"] = np.sin(
    2 * np.pi * df_transactions["transaction_timestamp"].dt.hour / 24
)
df_transactions["hour_cos"] = np.cos(
    2 * np.pi * df_transactions["transaction_timestamp"].dt.hour / 24
)


df_transactions["day_sin"] = np.sin(
    2 * np.pi * df_transactions["transaction_timestamp"].dt.dayofweek / 7
)
df_transactions["day_cos"] = np.cos(
    2 * np.pi * df_transactions["transaction_timestamp"].dt.dayofweek / 7
)


df_transactions["month_sin"] = np.sin(
    2 * np.pi * df_transactions["transaction_timestamp"].dt.month / 12
)
df_transactions["month_cos"] = np.cos(
    2 * np.pi * df_transactions["transaction_timestamp"].dt.month / 12
)


preprocessed_transaction_columns.extend(
    ["hour_sin", "hour_cos", "day_sin", "day_cos", "month_sin", "month_cos"]
)


# Scale the 'amount' column
scaler_amount = StandardScaler()
df_transactions["amount_scaled"] = scaler_amount.fit_transform(
    df_transactions[["amount"]]
)


preprocessed_transaction_columns.append("amount_scaled")


# Binary encode 'transaction_type'
encoder_tx_type = ce.BinaryEncoder(cols=["transaction_type"]).fit(
    df_transactions["transaction_type"]
)
transformed_column_names = encoder_tx_type.get_params()["mapping"][0]["mapping"].columns
df_transactions[transformed_column_names] = encoder_tx_type.transform(
    df_transactions["transaction_type"]
)
preprocessed_transaction_columns.extend(transformed_column_names)


print("\nPreprocessed Transactions Data:")
print(df_transactions)


# ------------------------------------------------
# Step 3.1: Generate Graph nodes and edges
# ------------------------------------------------

## Graph Generation
# - Within each node type, the IDs should be contiguous integers starting from 0.  
#   This means that node identifiers are local to their type (e.g., user, merchant) and sequential.


COL_MERCHANT_ID = "merchant_id"
COL_TRANSACTION_ID = "transaction_id"
COL_CUSTOMER_ID = "customer_id"


merchant_id_to_zero_based_id = dict(
    (key, idx) for idx, key in enumerate(df_merchants[COL_MERCHANT_ID].unique())
)
df_merchants[COL_MERCHANT_ID] = df_merchants[COL_MERCHANT_ID].map(
    merchant_id_to_zero_based_id
)
nr_unique_merchants = len(df_merchants[COL_MERCHANT_ID].unique())
 

customer_id_to_zero_based_id = dict(
    (key, idx) for idx, key in enumerate(df_customers[COL_CUSTOMER_ID].unique())
)
df_customers[COL_CUSTOMER_ID] = df_customers[COL_CUSTOMER_ID].map(
    customer_id_to_zero_based_id
)
nr_unique_customers = len(df_customers[COL_CUSTOMER_ID].unique())


transaction_id_to_zero_based_id = dict(
    (key, idx) for idx, key in enumerate(df_transactions[COL_TRANSACTION_ID].unique())
)


df_transactions[COL_TRANSACTION_ID] = df_transactions[COL_TRANSACTION_ID].map(
    transaction_id_to_zero_based_id
)
df_transactions[COL_MERCHANT_ID] = df_transactions[COL_MERCHANT_ID].map(
    merchant_id_to_zero_based_id
)
df_transactions[COL_CUSTOMER_ID] = df_transactions[COL_CUSTOMER_ID].map(
    customer_id_to_zero_based_id
)


# Create Edge List

edges_customer_to_merchant = df_transactions[[COL_CUSTOMER_ID, COL_MERCHANT_ID]].rename(
    columns={COL_CUSTOMER_ID: "src", COL_MERCHANT_ID: "dst"}
)


# Sort the customers and merchants based on zero based ids
df_customers = df_customers.sort_values(by=COL_CUSTOMER_ID)
df_merchants = df_merchants.sort_values(by=COL_MERCHANT_ID)


# ---------------------------------------------------------------
# Step 3.2: Write the files under the proper directory structure
# ---------------------------------------------------------------


# ├── edges
# │   ├── customer_to_merchant_attr.csv
# │   ├── customer_to_merchant.csv
# │   └── customer_to_merchant_label.csv
# └── nodes
#     ├── customer.csv
#     └── merchant.csv


# Write preprocessed data
os.makedirs(os.path.join(DATA_OUTPUT_DIR, "edges"), exist_ok=True)
os.makedirs(os.path.join(DATA_OUTPUT_DIR, "nodes"), exist_ok=True)


df_customers[preprocessed_customer_columns].to_csv(
    os.path.join(DATA_OUTPUT_DIR, "nodes", "customer.csv"), index=False
)


df_merchants[preprocessed_merchant_columns].to_csv(
    os.path.join(DATA_OUTPUT_DIR, "nodes", "merchant.csv"), index=False
)


edges_customer_to_merchant.to_csv(
    os.path.join(DATA_OUTPUT_DIR, "edges", "customer_to_merchant.csv"), index=False
)


df_transactions[preprocessed_transaction_columns].to_csv(
    os.path.join(DATA_OUTPUT_DIR, "edges", "customer_to_merchant_attr.csv"), index=False
)


df_transactions["is_fraud"].to_csv(
    os.path.join(DATA_OUTPUT_DIR, "edges", "customer_to_merchant_label.csv"),
    index=False,
)