Source code for stonkgs.models.nlp_baseline_model

# -*- coding: utf-8 -*-

"""NLP baseline model on the fine-tuning classification task, assuming the model embeddings are pre-trained."""

import logging
import os
from collections import Counter
from typing import Dict, List, Optional

import click
import mlflow
import numpy as np
import pandas as pd
import torch
from sklearn.metrics import f1_score
from sklearn.model_selection import KFold, StratifiedShuffleSplit
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    Trainer,
    TrainingArguments,
)

from stonkgs.constants import (
    CELL_LINE_DIR,
    CORRECT_DIR,
    DEEPSPEED_CONFIG_PATH,
    DISEASE_DIR,
    EMBEDDINGS_PATH,
    LOCATION_DIR,
    MLFLOW_FINETUNING_TRACKING_URI,
    NLP_BL_OUTPUT_DIR,
    NLP_MODEL_TYPE,
    RELATION_TYPE_DIR,
    SPECIES_DIR,
    STONKGS_OUTPUT_DIR,
)
from stonkgs.data.indra_for_pretraining import prepare_df

# Initialize logger
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Disable alembic info
logging.getLogger("alembic").setLevel(logging.WARNING)


[docs]class INDRAEvidenceDataset(torch.utils.data.Dataset): """Custom Dataset class for INDRA data.""" def __init__(self, encodings, labels): """Initialize INDRA Dataset based on token embeddings for each text evidence.""" # Assumes that the labels are numerically encoded self.encodings = encodings self.labels = labels def __getitem__(self, idx): """Return data entries (text evidences) for given indices.""" item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} item["labels"] = torch.tensor(self.labels[idx]) return item def __len__(self): """Return the length of the dataset.""" return len(self.labels)
[docs]def get_train_test_splits( data: pd.DataFrame, max_dataset_size: int = 100000, label_column_name: str = "class", random_seed: int = 42, n_splits: int = 5, ) -> List: """Return deterministic train/test indices for n_splits based on the fine-tuning dataset that is passed.""" # Leave out the label in the dataset data_no_labels = data.drop(label_column_name, axis=1) labels = data[label_column_name] # Cut the dataset down to max_dataset_size (deterministically!) using StratifiedShuffleSplit if needed: # (this is not an actual train/test split, this is just for getting a dataset of size max_dataset_size in a # stratified and deterministic manner) if len(data) > max_dataset_size: splitter = StratifiedShuffleSplit( n_splits=1, train_size=max_dataset_size, random_state=random_seed, ) for train_index, _ in splitter.split(data_no_labels, labels): data_no_labels = data_no_labels.iloc[train_index, :].reset_index(drop=True) labels = labels.iloc[train_index].reset_index(drop=True) # Generate the actual train/test splits here: # Implement non-stratified train/test splits with no validation split # It is shuffled deterministically (determined by random_seed) skf = KFold(n_splits=n_splits, random_state=random_seed, shuffle=True) return [ {"train_idx": train_idx, "test_idx": test_idx} for train_idx, test_idx in skf.split(data_no_labels, labels) ]
[docs]def run_nlp_baseline_classification_cv( train_data_path: str, sep: Optional[str] = "\t", model_type: str = NLP_MODEL_TYPE, output_dir: Optional[str] = NLP_BL_OUTPUT_DIR, logging_uri_mlflow: Optional[str] = MLFLOW_FINETUNING_TRACKING_URI, label_column_name: str = "class", text_data_column_name: str = "evidence", epochs: Optional[int] = 10, log_steps: int = 500, lr: float = 5e-5, batch_size: int = 16, gradient_accumulation: int = 1, task_name: str = "", embedding_path: str = EMBEDDINGS_PATH, deepspeed: bool = True, max_dataset_size: int = 100000, ) -> Dict: """Run cross-validation for the sequence classification task.""" # Get data splits indra_data = pd.read_csv(train_data_path, sep=sep) # TODO: leave it out later on? # Filter out any triples that contain a node that is not in the embeddings_dict embeddings_dict = prepare_df(embedding_path) original_length = len(indra_data) indra_data = indra_data[ indra_data["source"].isin(embeddings_dict.keys()) & indra_data["target"].isin(embeddings_dict.keys()) ].reset_index(drop=True) new_length = len(indra_data) logger.info( f"{original_length - new_length} out of {original_length} triples are left out because they contain " f"nodes which are not present in the pre-training data" ) train_test_splits = get_train_test_splits( indra_data, label_column_name=label_column_name, max_dataset_size=max_dataset_size, ) # Get text evidences and labels evidences_text, labels_str = indra_data[text_data_column_name], indra_data[label_column_name] # Numerically encode labels unique_tags = set(label for label in labels_str) tag2id = {label: number for number, label in enumerate(unique_tags)} id2tag = {value: key for key, value in tag2id.items()} labels = pd.Series([int(tag2id[label]) for label in labels_str]) # Initialize the f1-score f1_scores = [] # End previous run mlflow.end_run() # Initialize mlflow run, set tracking URI to use the same experiment for all runs, # so that one can compare them mlflow.set_tracking_uri(logging_uri_mlflow) mlflow.set_experiment("NLP Baseline for STonKGs") # Start a parent run so that all CV splits are tracked as nested runs # mlflow.start_run(run_name='Parent Run') # Initialize a dataframe for all the predicted labels result_df = pd.DataFrame() for idx, indices in enumerate(train_test_splits): # Initialize tokenizer and model tokenizer = AutoTokenizer.from_pretrained(model_type) model = AutoModelForSequenceClassification.from_pretrained( model_type, num_labels=len(unique_tags) ) # Encode all text evidences, pad and truncate to max_seq_len train_evidences = tokenizer( evidences_text[indices["train_idx"]].tolist(), truncation=True, padding=True ) test_evidences = tokenizer( evidences_text[indices["test_idx"]].tolist(), truncation=True, padding=True ) train_labels = labels[indices["train_idx"]].tolist() test_labels = labels[indices["test_idx"]].tolist() train_dataset = INDRAEvidenceDataset(encodings=train_evidences, labels=train_labels) test_dataset = INDRAEvidenceDataset(encodings=test_evidences, labels=test_labels) # Note that due to the randomization in the batches, the training/evaluation is slightly # different every time training_args = TrainingArguments( # label_names output_dir=output_dir, num_train_epochs=epochs, # total number of training epochs logging_steps=log_steps, learning_rate=lr, # Use deepspeed with a specified config file for speedup deepspeed=DEEPSPEED_CONFIG_PATH if deepspeed else None, report_to=["mlflow"], # log via mlflow do_train=True, do_predict=True, per_device_train_batch_size=batch_size, gradient_accumulation_steps=gradient_accumulation, ) # Initialize Trainer based on the training dataset trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, ) # Train trainer.train() # Log some details about the datasets used in training and testing mlflow.log_param("label dict", str(tag2id)) mlflow.log_param("training dataset size", str(len(train_labels))) mlflow.log_param("training class dist", str(Counter(train_labels))) mlflow.log_param("test dataset size", str(len(test_labels))) mlflow.log_param("test class dist", str(Counter(test_labels))) # Make predictions for the test dataset predictions = trainer.predict(test_dataset=test_dataset).predictions predicted_labels = np.argmax(predictions, axis=1) logger.info(f"Predicted labels: {predicted_labels}") # Save the predicted + true labels partial_result_df = pd.DataFrame( { "split": idx, "index": indices["test_idx"].tolist(), "predicted_label": predicted_labels.tolist(), "true_label": test_labels, "evidence": evidences_text[indices["test_idx"]].tolist(), }, ) result_df = result_df.append( partial_result_df, ignore_index=True, ) # Use weighted average f1_sc = f1_score(test_labels, predicted_labels, average="weighted") f1_scores.append(f1_sc) # Log the final f1 score of the split mlflow.log_metric("f1_score_weighted", f1_sc) # Log mean and std f1-scores from the cross validation procedure (average and std across all splits) to the # standard logger logger.info(f"Mean f1-score: {np.mean(f1_scores)}") logger.info(f"Std f1-score: {np.std(f1_scores)}") # Map the labels in the result df back to their original names result_df = result_df.replace({"predicted_label": id2tag, "true_label": id2tag}) # Save the result_df result_df.to_csv( os.path.join(NLP_BL_OUTPUT_DIR, "predicted_labels_nlp_" + task_name + "df.tsv"), index=False, sep="\t", ) # Save the last model trainer.save_model(output_dir=NLP_BL_OUTPUT_DIR) # End the previous run mlflow.end_run() # Log the mean and std f1 score from the cross validation procedure to mlflow with mlflow.start_run(): # Log the task name as well mlflow.log_param("task name", task_name) mlflow.log_metric("f1_score_mean", np.mean(f1_scores)) mlflow.log_metric("f1_score_std", np.std(f1_scores)) # End parent run # mlflow.end_run() return {"f1_score_mean": np.mean(f1_scores), "f1_score_std": np.std(f1_scores)}
@click.command() @click.option("-e", "--epochs", default=5, help="Number of epochs", type=int) @click.option("--lr", default=5e-5, help="Learning rate", type=float) @click.option( "--logging_dir", default=MLFLOW_FINETUNING_TRACKING_URI, help="Mlflow logging/tracking URI", type=str, ) @click.option("--log_steps", default=500, help="Number of steps between each log", type=int) @click.option("--output_dir", default=STONKGS_OUTPUT_DIR, help="Output directory", type=str) @click.option("--batch_size", default=8, help="Batch size used in fine-tuning", type=int) @click.option( "--gradient_accumulation_steps", default=1, help="Gradient accumulation steps", type=int ) @click.option("--deepspeed", default=True, help="Whether to use deepspeed or not", type=bool) @click.option( "--max_dataset_size", default=100000, help="Maximum dataset size of the fine-tuning datasets", type=int, ) @click.option("--local_rank", default=-1, help="THIS PARAMETER IS IGNORED", type=int) def run_all_fine_tuning_tasks( epochs: int = 5, log_steps: int = 500, lr: float = 5e-5, output_dir: str = STONKGS_OUTPUT_DIR, logging_dir: Optional[str] = MLFLOW_FINETUNING_TRACKING_URI, batch_size: int = 8, gradient_accumulation_steps: int = 1, deepspeed: bool = True, max_dataset_size: int = 100000, # effectively removes the max dataset size restriction local_rank: int = -1, ): """Run all fine-tuning tasks at once.""" # Specify all directories and file names directories = [ CELL_LINE_DIR, CORRECT_DIR, CORRECT_DIR, DISEASE_DIR, LOCATION_DIR, SPECIES_DIR, RELATION_TYPE_DIR, RELATION_TYPE_DIR, ] file_names = [ "cell_line_no_duplicates.tsv", "correct_incorrect_binary_no_duplicates.tsv", "correct_incorrect_multiclass_no_duplicates.tsv", "disease_no_duplicates.tsv", "location_no_duplicates.tsv", "species_no_duplicates.tsv", "relation_type_no_duplicates.tsv", "relation_type_no_duplicates.tsv", ] task_names = [ "cell_line", "correct_binary", "correct_multiclass", "disease", "location", "species", "interaction", "polarity", ] # Specify the column names of the target variable column_names = ["class"] * 6 + ["interaction"] + ["polarity"] for directory, file, column_name, task_name in zip( directories, file_names, column_names, task_names, ): # Run each of the six classification tasks run_nlp_baseline_classification_cv( train_data_path=os.path.join(directory, file), output_dir=output_dir, logging_uri_mlflow=logging_dir, epochs=epochs, log_steps=log_steps, lr=lr, batch_size=batch_size, gradient_accumulation=gradient_accumulation_steps, label_column_name=column_name, task_name=task_name, deepspeed=deepspeed, max_dataset_size=max_dataset_size, ) logger.info(f"Finished the {task_name} task") if __name__ == "__main__": # Set the huggingface environment variable for tokenizer parallelism to false os.environ["TOKENIZERS_PARALLELISM"] = "false" # Run all classification tasks run_all_fine_tuning_tasks()