Full Data Preprocessing Pipeline#

This section presents a full preprocessing pipeline for the example toy dataset. It integrates several preprocessing techniques for customers, merchants, and transactions, and then prepares the graph data for Graph Neural Network (GNN)-based model training.

Full Dataset Preprocessing Pipeline for the example dataset#

Once all preprocessing steps are complete and the features for the nodes have been assembled, the feature matrix is converted to float32 to ensure compatibility with models that require this format.


import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import category_encoders as ce
from scipy.linalg import block_diag

DATA_OUTPUT_DIR = "example_graph_data"

# Customers dataset
df_customers = pd.DataFrame(
    {
        "customer_id": ["C001", "C002", "C003"],
        "age": [25, 40, 35],
        "gender": ["Male", "Female", "Other"],
        "loyalty_points": [150, 300, 225],
    }
)

# Merchants dataset
df_merchants = pd.DataFrame(
    {
        "merchant_id": ['M001', 'M002', 'M003'],
        "merchant_rating": ["Poor", "Average", "Excellent"],
        "merchant_category": ["Grocery", "Electronics", "Apparel"],
        "merchant_location": ["New York", "90001", "Chicago"],  # City or ZIP code
    }
)

# Transactions dataset
df_transactions = pd.DataFrame(
    {
        "transaction_id": ['T001', 'T002', 'T003', 'T004', 'T005'],
        "customer_id": ['C002', 'C003', 'C001', 'C001', 'C003'],
        "merchant_id": ['M002', 'M003', 'M001', 'M003', 'M003'],
        "amount": [250.0, 175.5, 320.75, 85.99, 1000.0],
        "transaction_timestamp": [
            "2025-03-10 14:23:00",
            "2025-03-10 09:45:00",
            "2025-03-08 16:30:00",
            "2025-03-01 06:30:00",
            "2025-03-11 02:30:00",
        ],
        "transaction_type": ["online", "in-store", "online", "online", "in-store"],
        "is_fraud": [1, 0, 1, 0, 1],
    }
)

# ------------------------------------------------
# Step 2.1: Preprocess Customers
# ------------------------------------------------

preprocessed_customer_columns = []
preprocessed_merchant_columns = []
preprocessed_transaction_columns = []

# Binary encode customer_id (fit encoder and save for reuse)
encoder_customer_id = ce.BinaryEncoder(cols=["customer_id"]).fit(
    df_customers["customer_id"]
)
transformed_column_names = encoder_customer_id.get_params()["mapping"][0]["mapping"].columns
df_customers[transformed_column_names] = encoder_customer_id.transform(df_customers["customer_id"])
preprocessed_customer_columns.extend(transformed_column_names)

# Binary encode gender
encoder_gender = ce.BinaryEncoder(cols=["gender"]).fit(df_customers["gender"])
transformed_column_names = encoder_gender.get_params()["mapping"][0]["mapping"].columns
df_customers[transformed_column_names] = encoder_gender.transform(df_customers["gender"])
preprocessed_customer_columns.extend(transformed_column_names)

# Scale numerical features: age and loyalty_points
scaler_customers = StandardScaler()
df_customers[["age", "loyalty_points"]] = scaler_customers.fit_transform(
    df_customers[["age", "loyalty_points"]]
)
preprocessed_customer_columns.extend(["age", "loyalty_points"])


print("\nPreprocessed Customers Data:")
print(df_customers)

# ------------------------------------------------
# Step 2.2: Preprocess Merchants
# ------------------------------------------------

# Binary encode merchant_id (fit encoder and save for reuse)
encoder_merchant_id = ce.BinaryEncoder(cols=["merchant_id"]).fit(
    df_merchants["merchant_id"]
)
transformed_column_names = encoder_merchant_id.get_params()["mapping"][0]["mapping"].columns
df_merchants[transformed_column_names] = encoder_merchant_id.transform(df_merchants["merchant_id"])
preprocessed_merchant_columns.extend(transformed_column_names)

# Ordinal encode merchant_rating
rating_mapping = {"Poor": 1, "Average": 2, "Good": 3, "Very Good": 4, "Excellent": 5}
df_merchants["merchant_rating_numeric"] = df_merchants["merchant_rating"].map(
    rating_mapping
)
preprocessed_merchant_columns.extend(["merchant_rating_numeric"])


# Binary encode merchant_category
encoder_merchant_cat = ce.BinaryEncoder(cols=["merchant_category"]).fit(
    df_merchants["merchant_category"]
)
transformed_column_names = encoder_merchant_cat.get_params()["mapping"][0]["mapping"].columns
df_merchants[transformed_column_names] = encoder_merchant_cat.transform(df_merchants["merchant_category"])
preprocessed_merchant_columns.extend(transformed_column_names)


# Binary encode merchant_location
encoder_merchant_location = ce.BinaryEncoder(cols=["merchant_location"]).fit(
    df_merchants["merchant_location"]
)
transformed_column_names = encoder_merchant_location.get_params()["mapping"][0]["mapping"].columns
df_merchants[transformed_column_names] = encoder_merchant_location.transform(
    df_merchants["merchant_location"]
)
preprocessed_merchant_columns.extend(transformed_column_names)


print("\nPreprocessed Merchants Data:")
print(df_merchants)

# ------------------------------------------------
# Step 2.3: Preprocess Transactions
# ------------------------------------------------

# Process transaction_timestamp: convert to datetime and extract features
df_transactions["transaction_timestamp"] = pd.to_datetime(
    df_transactions["transaction_timestamp"]
)
df_transactions["hour"] = df_transactions["transaction_timestamp"].dt.hour
df_transactions["day_of_week"] = df_transactions["transaction_timestamp"].dt.dayofweek
df_transactions["month"] = df_transactions["transaction_timestamp"].dt.month
preprocessed_transaction_columns.extend(["hour", "day_of_week", "month"])

# Scale the 'amount' column
scaler_amount = StandardScaler()
df_transactions["amount_scaled"] = scaler_amount.fit_transform(
    df_transactions[["amount"]]
)

# Binary encode 'transaction_type'
encoder_tx_type = ce.BinaryEncoder(cols=["transaction_type"]).fit(
    df_transactions["transaction_type"]
)
transformed_column_names = encoder_tx_type.get_params()["mapping"][0]["mapping"].columns
df_transactions[transformed_column_names] = encoder_tx_type.transform(df_transactions["transaction_type"])
preprocessed_transaction_columns.extend(transformed_column_names)

print("\nPreprocessed Transactions Data:")
print(df_transactions)

# ------------------------------------------------
# Step 3.1: Generate Graph nodes and edges
# ------------------------------------------------


## Graph Generation
# - Each node in the graph must have a unique ID.
# - The IDs must be contiguous and start from 0.
# In the following code, merchant nodes are assigned unique IDs starting from 0.
# After renumbering the merchant nodes, customer nodes receive the subsequent IDs,
# and finally, transaction nodes are assigned their IDs in sequence.

COL_MERCHANT_ID = "merchant_id"
COL_TRANSACTION_ID = "transaction_id"
COL_CUSTOMER_ID = "customer_id"


merchant_id_to_zero_based_id = dict(
    (key, idx) for idx, key in enumerate(df_merchants[COL_MERCHANT_ID].unique())
)
df_merchants[COL_MERCHANT_ID] = df_merchants[COL_MERCHANT_ID].map(
    merchant_id_to_zero_based_id
)
nr_unique_merchants = len(df_merchants[COL_MERCHANT_ID].unique())

customer_id_to_zero_based_id = dict(
    (key, idx + nr_unique_merchants)
    for idx, key in enumerate(df_customers[COL_CUSTOMER_ID].unique())
)
df_customers[COL_CUSTOMER_ID] = df_customers[COL_CUSTOMER_ID].map(
    customer_id_to_zero_based_id
)
nr_unique_customers = len(df_customers[COL_CUSTOMER_ID].unique())

transaction_id_to_zero_based_id = dict(
    (key, idx + nr_unique_merchants + nr_unique_customers)
    for idx, key in enumerate(df_transactions[COL_TRANSACTION_ID].unique())
)

df_transactions[COL_TRANSACTION_ID] = df_transactions[COL_TRANSACTION_ID].map(
    transaction_id_to_zero_based_id
)
df_transactions[COL_MERCHANT_ID] = df_transactions[COL_MERCHANT_ID].map(
    merchant_id_to_zero_based_id
)
df_transactions[COL_CUSTOMER_ID] = df_transactions[COL_CUSTOMER_ID].map(
    customer_id_to_zero_based_id
)


# Create Edge List
# This step generates an edge list by connecting customer, merchant, and transaction IDs,
# thereby establishing the relationships between these nodes in the graph.

# Create edges from CUSTOMER_ID to TRANSACTION_ID
edges_customer_to_transaction = df_transactions[
    [COL_CUSTOMER_ID, COL_TRANSACTION_ID]
].rename(columns={COL_CUSTOMER_ID: "src", COL_TRANSACTION_ID: "dst"})

# Create edges from TRANSACTION_ID to MERCHANT_ID
edges_transaction_to_merchant = df_transactions[
    [COL_TRANSACTION_ID, COL_MERCHANT_ID]
].rename(columns={COL_TRANSACTION_ID: "src", COL_MERCHANT_ID: "dst"})


# Combine all edges
edges = pd.concat(
    [
        edges_customer_to_transaction,
        edges_transaction_to_merchant,
    ],
    ignore_index=True,
)

# If needed, each connection can be represented in both directions.
is_bidirectional=False

if is_bidirectional:
    # Create reverse edges from TRANSACTION_ID to CUSTOMER_ID
    edges_transaction_to_customer = df_transactions[
        [COL_CUSTOMER_ID, COL_TRANSACTION_ID]
    ].rename(columns={COL_CUSTOMER_ID: "dst", COL_TRANSACTION_ID: "src"})

    # Create reverse edges from MERCHANT_ID to TRANSACTION_ID
    edges_merchant_to_transaction = df_transactions[
        [COL_TRANSACTION_ID, COL_MERCHANT_ID]
    ].rename(columns={COL_TRANSACTION_ID: "dst", COL_MERCHANT_ID: "src"})

    # Combine all edges
    edges = pd.concat(
        [
            edges_customer_to_transaction,
            edges_transaction_to_customer,
            edges_transaction_to_merchant,
            edges_merchant_to_transaction,
        ],
        ignore_index=True,
    )


print('Edges:')
print(edges)

## Create Node feature
#  Each node's unique identifier corresponds to its row index in the node feature (e.g. node.csv) file.
#  The order of ids must be maintained in the node feature (e.g. `node.csv`) file.
#   1. Merchant nodes: starting from 0.
#   2. Customer nodes: receiving subsequent IDs after merchant nodes.
#   3. Transaction nodes: assigned in sequence after customer nodes.
#


# Create the block matrix with correct node ordering
# The block matrix is constructed by aligning the nodes in the same order as defined as defined earlier.
# This ensures that merchant nodes, customer nodes, and transaction nodes are correctly positioned in the matrix.

M = df_merchants[preprocessed_merchant_columns].values
C = df_customers[preprocessed_customer_columns].values
T = df_transactions[preprocessed_transaction_columns].values
node_feature_df = pd.DataFrame(
    block_diag(M, C, T),
    columns=preprocessed_merchant_columns
    + preprocessed_customer_columns
    + preprocessed_transaction_columns,
)


print('Node feature:')
print(node_feature_df)

## Create Node label
# The node label file must contain the same number of rows as the node feature file (e.g., node.csv).
# For nodes without a label, assign a default value of zero.

# Initialize with all zeros
node_label_df = pd.DataFrame(
    np.zeros(len(node_feature_df), dtype=int), columns=["is_fraud"]
)

# Copy the label of transaction nodes to corresponding indices
node_label_df.iloc[df_transactions[COL_TRANSACTION_ID].values, 0] = df_transactions[
    "is_fraud"
].values

print('Node label:')
print(node_label_df)

# ---------------------------------------------------------------
# Step 3.2: Write the files under the proper directory structure
# ---------------------------------------------------------------

#  data_root
#    ├── edges
#    │   └── node_to_node.csv
#    └── nodes
#        ├── node.csv
#        └── node_label.csv


os.makedirs(os.path.join(DATA_OUTPUT_DIR, "edges"), exist_ok=True)
os.makedirs(os.path.join(DATA_OUTPUT_DIR, "nodes"), exist_ok=True)

node_feature_df.to_csv(os.path.join(DATA_OUTPUT_DIR, "nodes", "node.csv"), index=False)
node_label_df.to_csv(
    os.path.join(DATA_OUTPUT_DIR, "nodes", "node_label.csv"), index=False
)
edges.to_csv(os.path.join(DATA_OUTPUT_DIR, "edges", "node_to_node.csv"), index=False)