Skip to content

Bench

load_data_run_benchmark(result_path, adata_path, write_results=True)

Load the inference embeddings, original h5ad file for metadata, and finally run the benchmark.

Parameters:

Name Type Description Default
result_path

(Path) path to the directory containing the inference results

required
adata_path

(AnnData) the original AnnData object- IMPORTANT, this is used for fetching labels.

required
write_results

(bool) whether to write the results to a csv file.

True
Source code in bionemo/geneformer/scripts/celltype_classification_bench/bench.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def load_data_run_benchmark(result_path, adata_path, write_results=True):
    """Load the inference embeddings, original h5ad file for metadata, and finally run the benchmark.

    Args:
        result_path: (Path) path to the directory containing the inference results
        adata_path: (AnnData) the original AnnData object- IMPORTANT, this is used for fetching labels.
        write_results: (bool) whether to write the results to a csv file.
    """
    import torch

    adata = read_h5ad(adata_path)

    infer_Xs = torch.load(result_path / "predictions__rank_0.pt")["embeddings"].float().cpu().numpy()
    assert len(adata) == len(infer_Xs), (len(adata), len(infer_Xs))

    infer_metadata = adata.obs
    labels = infer_metadata["cell_type"].values

    # Now we assign integer labels to each of our strings. These do not need to be transformed into one-hot vectors as Random Forest is non-parametric.
    from sklearn.preprocessing import LabelEncoder

    label_encoder = LabelEncoder()
    integer_labels = label_encoder.fit_transform(labels)
    print(integer_labels)

    # run actual benchmark
    results, cm = run_benchmark(infer_Xs, integer_labels, use_pca=False)

    data = {
        "f1_score_mean": [
            results["test_f1_score"][0],
        ],
        "f1_score_std": [
            results["test_f1_score"][1],
        ],
        "accuracy_mean": [
            results["test_accuracy"][0],
        ],
        "accuracy_std": [
            results["test_accuracy"][1],
        ],
    }

    output_path = result_path / "results.csv"

    # get the column names in a stable order
    columns = list(data.keys())

    rows = []
    for i in range(len(data[columns[0]])):
        row = [data[col][i] for col in columns]
        rows.append(row)

    df = pd.DataFrame(rows, columns=columns)

    if write_results:
        with open(output_path, "w") as f:
            # figure out how many rows we have (assumes all lists are same length)
            # write header
            f.write(",".join(columns) + "\n")
            for row in rows:
                f.write(",".join(map(str, row)) + "\n")

    return df

run_benchmark(data, labels, use_pca=True)

Run the accuracy, precision, recall, and F1-score benchmarks.

Parameters:

Name Type Description Default
data

(R, C) contains the single cell expression (or whatever feature) in each row.

required
labels

(R,) contains the string label for each cell

required
use_pca

whether to fit PCA to the data.

True

Returns:

Name Type Description
results_out

(dict) contains the accuracy, precision, recall, and F1-score for each class.

conf_matrix

(R, R) contains the confusion matrix.

Source code in bionemo/geneformer/scripts/celltype_classification_bench/bench.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def run_benchmark(data, labels, use_pca=True):
    """Run the accuracy, precision, recall, and F1-score benchmarks.

    Args:
        data: (R, C) contains the single cell expression (or whatever feature) in each row.
        labels: (R,) contains the string label for each cell
        use_pca: whether to fit PCA to the data.

    Returns:
        results_out: (dict) contains the accuracy, precision, recall, and F1-score for each class.
        conf_matrix: (R, R) contains the confusion matrix.
    """
    np.random.seed(1337)
    # Get input and output dimensions
    n_features = data.shape[1]
    hidden_size = 128

    # Define the target dimension 'n_components' for PCA
    n_components = min(10, n_features)  # ensure we don't try to get more components than features

    # Create a pipeline that includes scaling and MLPClassifier
    if use_pca:
        pipeline = Pipeline(
            [
                ("scaler", StandardScaler()),
                ("projection", PCA(n_components=n_components)),
                (
                    "classifier",
                    MLPClassifier(
                        hidden_layer_sizes=(hidden_size,),
                        max_iter=500,
                        random_state=1337,
                        early_stopping=True,  # Enable early stopping
                        validation_fraction=0.1,  # Use 10% of training data for validation
                        n_iter_no_change=50,  # Stop if validation score doesn't improve for 10 iterations
                        verbose=False,  # Print convergence messages
                    ),
                ),
            ]
        )
    else:
        pipeline = Pipeline(
            [
                ("scaler", StandardScaler()),
                (
                    "classifier",
                    MLPClassifier(
                        hidden_layer_sizes=(hidden_size,),
                        max_iter=500,
                        random_state=1337,
                        early_stopping=True,
                        validation_fraction=0.1,
                        n_iter_no_change=50,
                        verbose=False,
                    ),
                ),
            ]
        )

    # Set up StratifiedKFold to ensure each fold reflects the overall distribution of labels
    cv = StratifiedKFold(n_splits=5)

    # Define the scoring functions
    scoring = {
        "accuracy": make_scorer(accuracy_score),
        "precision": make_scorer(precision_score, average="macro"),  # 'macro' averages over classes
        "recall": make_scorer(recall_score, average="macro"),
        "f1_score": make_scorer(f1_score, average="macro"),
    }

    # Track convergence warnings
    convergence_warnings = []
    with warnings.catch_warnings(record=True) as w:
        warnings.filterwarnings("always", category=ConvergenceWarning)

        # Perform stratified cross-validation with multiple metrics using the pipeline
        results = cross_validate(pipeline, data, labels, cv=cv, scoring=scoring, return_train_score=False)

        # Collect any convergence warnings
        convergence_warnings = [warn.message for warn in w if issubclass(warn.category, ConvergenceWarning)]

    # Print the cross-validation results
    print("Cross-validation metrics:")
    results_out = {}
    for metric, scores in results.items():
        if metric.startswith("test_"):
            results_out[metric] = (scores.mean(), scores.std())
            print(f"{metric[5:]}: {scores.mean():.3f} (+/- {scores.std():.3f})")

    predictions = cross_val_predict(pipeline, data, labels, cv=cv)

    # v Return confusion matrix and metrics.
    conf_matrix = confusion_matrix(labels, predictions)

    # Print convergence information
    if convergence_warnings:
        print("\nConvergence Warnings:")
        for warning in convergence_warnings:
            print(f"- {warning}")
    else:
        print("\nAll folds converged successfully")

    return results_out, conf_matrix