Training a SpaCy Text Classification Model for Smoking Status (Multi-level Classification)

2024-12-25




try multi-level classification with spaCy

inspired By

https://github.com/falihazikra/Medical-Record-Classifier

Code

import nltk
nltk.download('stopwords')
import spacy
import pandas as pd
import time
import warnings
import Train
from pathlib import Path
import numpy as np
import os
import json
import shutil
from tqdm import tqdm
from spacy.tokens import DocBin
from spacy.cli.init_config import fill_config
from spacy.cli.train import train
from spacy.cli.evaluate import evaluate
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, f1_score
from typing import List, Dict, Tuple

spacy.prefer_gpu()

warnings.filterwarnings('ignore')

CONFIG_DIR = "config"
MODEL_DIR = "model"
DATA_DIR = "./data"
TRAIN_FILE = "smokers_surrogate_train_all_version2.xml"
TEST_FILE = "smokers_surrogate_test_all_groundtruth_version2.xml"
LOG_FILE = "training_log.jsonl"

def cleanup_model_directory(model_path: str) -> None:
    shutil.rmtree(model_path, ignore_errors=True)

def convert_data_to_dataframe(record: str, apply_text_process: bool = True) -> pd.DataFrame:
    df = Train.GetJsonFromRecords(record)
    df = Train.ColTransform(df)

    if apply_text_process:
        df["descrp"] = df["descrp"].apply(Train.TextProcess)
    return df

def apply_col(df: pd.DataFrame, func: callable) -> pd.DataFrame:
    df[func.__name__] = df["smoking_status"].apply(func)
    return df

def convert_dataframe_to_list(df: pd.DataFrame, func: callable) -> List[Dict[str, str]]:
    data_list = []
    for i in range(len(df)):
        data_dict = {
            "text": df['descrp'][i],
            "category": df[func.__name__][i]
        }
        data_list.append(data_dict)
    return data_list

def convert_list_to_spacy(data_list: List[Dict[str, str]], spacy_path: str, nlp: spacy.language.Language, db: DocBin) -> None:
    categories = ["1", "0"]
    for l in data_list:
        doc = nlp.make_doc(l["text"])
        doc.cats = {category: 0 for category in categories}
        doc.cats[l["category"]] = 1
        db.add(doc)
    db.to_disk(spacy_path)

def create_directory(dir_path: str) -> None:
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)

def texting_evaluation(test_data: pd.DataFrame, nlp: spacy.language.Language) -> pd.DataFrame:
    label = 'category'
    for i in tqdm(range(len(test_data))):
        text = test_data['text'][i]

        if test_data[label][i] == '0':
            test_data['true'][i] = 0.0
        else:
            test_data['true'][i] = 1.0

        doc = nlp(text)

        predict_cat = max(doc.cats, key=doc.cats.get)
        test_data['predict'][i] = {'0': 0.0}.get(predict_cat, 1.0)
        test_data['score'][i] = 1 if str(test_data['true'][i]) == str(test_data['predict'][i]) else 0
    return test_data

def update_log(training_log_file: str, log_file: str, apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> None:
    try:
        with open(training_log_file, 'r') as training_log, open(log_file, 'a') as all_log:
            for line in training_log:
                log_entry = json.loads(line.strip())
                log_entry.update({
                    "apply_text_process": apply_text_process,
                    "ngram_size": ngram_size,
                    "seed": seed,
                    "level": func.__name__
                })
                all_log.write(json.dumps(log_entry) + '\n')
        print(f"Logs appended to {log_file} successfully.")
        os.remove(training_log_file)
    except FileNotFoundError:
        print(f"Training log file {training_log_file} not found. Skipping log appending.")
    except Exception as e:
        print(f"An error occurred while appending logs: {e}")

def format_evaluation_results(metrics: dict) -> None:
    # Extract and format values
    token_acc = metrics['token_acc'] * 100
    macro_f = metrics['cats_score'] * 100
    speed = metrics['speed']

    # Extract Textcat F (per label)
    textcat_f_per_label = metrics['cats_f_per_type']
    formatted_textcat_f = []
    for label, values in textcat_f_per_label.items():
        p = values['p'] * 100
        r = values['r'] * 100
        f = values['f'] * 100
        formatted_textcat_f.append((label, p, r, f))

    # Extract Textcat ROC AUC (per label)
    roc_auc_per_label = metrics['cats_auc_per_type']
    formatted_roc_auc = [(label, auc) for label, auc in roc_auc_per_label.items()]

    # Print results
    print("\nℹ Using CPU\n")
    print("================================== Results ==================================\n")
    print(f"TOK                 {token_acc:.2f}")
    print(f"TEXTCAT (macro F)   {macro_f:.2f}")
    print(f"SPEED               {speed:.0f}\n")

    print("=========================== Textcat F (per label) ===========================\n")
    print(f"{'':<12}    {'P':<8}{'R':<8}{'F':<8}")
    for label, p, r, f in formatted_textcat_f:
        print(f"{label:<12} {p:.2f}   {r:.2f}   {f:.2f}")

    print("\n======================== Textcat ROC AUC (per label) ========================\n")
    print(f"{'':<12}    {'ROC AUC'}")
    for label, auc in formatted_roc_auc:
        print(f"{label:<12} {auc:.2f}")

    print("\n✔ Saved results to en/efficiency/metrics_en.json")

def prepare_training_paths(apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> Tuple[str, str, str, str]:
    train_spacy_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "train.spacy")
    dev_spacy_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "dev.spacy")
    test_spacy_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "test.spacy")
    metrics_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "metrics.json")
    return train_spacy_path, dev_spacy_path, test_spacy_path, metrics_path

def prepare_directories(apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> Tuple[str, str, str]:
    log_dir_path = os.path.join(MODEL_DIR, "log")
    model_dir_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}")
    model_best_path = os.path.join(model_dir_path, "model-best")

    create_directory(log_dir_path)
    create_directory(model_dir_path)

    return log_dir_path, model_dir_path, model_best_path

def process_training_data(training_list: List[Dict[str, str]], seed: int, train_spacy_path: str, dev_spacy_path: str, test_spacy_path: str, nlp: spacy.language.Language) -> None:
    Train_list, Dev_list = train_test_split(training_list, test_size = 0.1, random_state = seed)

    Test_list = Dev_list[int(len(Dev_list)/2):]
    Dev_list = Dev_list[:int(len(Dev_list)/2)]
    db = DocBin()

    convert_list_to_spacy(Train_list, train_spacy_path, nlp, db)
    convert_list_to_spacy(Dev_list, dev_spacy_path, nlp, db)
    convert_list_to_spacy(Test_list, test_spacy_path, nlp, db)

def train_and_dev_evaluate(training_list: List[Dict[str, str]], apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> spacy.language.Language:


    train_spacy_path, dev_spacy_path, test_spacy_path, metrics_path = prepare_training_paths(apply_text_process, ngram_size, seed, func)
    log_dir_path, model_dir_path, model_best_path = prepare_directories(apply_text_process, ngram_size, seed, func)


    nlp = spacy.blank("en")

    process_training_data(training_list, seed, train_spacy_path, dev_spacy_path, test_spacy_path, nlp)

    # fill config
    base_config_path = os.path.join(CONFIG_DIR, "base_config.cfg")
    config_path = os.path.join(CONFIG_DIR, "config.cfg")
    if not os.path.isfile(config_path):
        fill_config(output_file = Path(config_path),
                    base_path = base_config_path)

    start = time.time()

    # training
    train(config_path = config_path,
          output_path = model_dir_path,
          overrides = {"paths.train": train_spacy_path,
                       "paths.dev": dev_spacy_path,
                       "components.textcat.model.ngram_size": ngram_size},
          # use_gpu=0
          )

    # training log
    all_log_path = os.path.join(log_dir_path, "training_all_log.json")
    update_log(LOG_FILE, all_log_path, apply_text_process, ngram_size, seed, func)

    # evalution
    metrics_output = evaluate(
        model = model_best_path,
        output = metrics_path,
        data_path = test_spacy_path
    )
    print(format_evaluation_results(metrics_output))

    # load model
    nlp = spacy.load(model_best_path)

    print("TRAINING TIME: ", time.time() - start)

    return nlp

def testing_evaluate(testing: List[Dict[str, str]], nlp: spacy.language.Language) -> pd.DataFrame:
    testing_df = pd.DataFrame(testing)
    testing_df['true'] = ''
    testing_df['predict'] = ''
    testing_df['score'] = ''

    tested_data = texting_evaluation(testing_df, nlp)
    return tested_data

def evaluate_metrics(tested_data: pd.DataFrame) -> Tuple[float, float, float]:
    true_values = list(tested_data['true'].values)
    predict_values = list(tested_data['predict'].values)

    accuracy = accuracy_score(true_values, predict_values)
    precision = precision_score(true_values, predict_values, average='weighted')
    f1 = f1_score(true_values, predict_values, average='weighted')

    return accuracy, precision, f1

def process(train_df: pd.DataFrame, test_df: pd.DataFrame, apply_text_process: bool, func: callable) -> Tuple[pd.DataFrame, pd.DataFrame]:
    train_df = apply_col(train_df, func)
    test_df = apply_col(test_df, func)

    training_list = convert_dataframe_to_list(train_df, func)
    testing_list = convert_dataframe_to_list(test_df, func)

    for ngram_size in ngram_sizes:
        accuracy_scores = []
        precision_scores = []
        f1_scores = []
        for seed in seeds:
            print(f"Testing with TextProcess={apply_text_process} and ngram_size={ngram_size}")
            nlp = train_and_dev_evaluate(training_list, apply_text_process, ngram_size, seed, func)
            tested_data = testing_evaluate(testing_list, nlp)
            accuracy, precision, f1 = evaluate_metrics(tested_data)
            accuracy_scores.append(accuracy)
            precision_scores.append(precision)
            f1_scores.append(f1)

        # cross validation
        avg_accuracy = np.mean(accuracy_scores)
        avg_precision = np.mean(precision_scores)
        avg_f1 = np.mean(f1_scores)

        outputs.append((apply_text_process, ngram_size, func.__name__, avg_accuracy, avg_precision, avg_f1))

    return train_df, test_df


ngram_sizes = [1, 2, 3]
seeds = [0, 1]
outputs = []

cleanup_model_directory(MODEL_DIR)

for apply_text_process in [False, True]:
    training_df = convert_data_to_dataframe("./data/smokers_surrogate_train_all_version2.xml", apply_text_process)
    testing_df = convert_data_to_dataframe("./data/smokers_surrogate_test_all_groundtruth_version2.xml", apply_text_process)

    training_df, testing_df = process(training_df, testing_df, apply_text_process, func = Train.UnknownCol)

    training_secondf = training_df[training_df[Train.UnknownCol.__name__] == "0"].reset_index().drop(['index'], axis=1)
    testing_secondf = testing_df[testing_df[Train.UnknownCol.__name__] == "0"].reset_index().drop(['index'], axis=1)

    training_secondf, testing_secondf = process(training_secondf, testing_secondf, apply_text_process, func = Train.NonSmokerCol)

    training_thirddf = training_secondf[training_secondf[Train.NonSmokerCol.__name__] == "0"].reset_index().drop(['index'], axis=1)
    testing_thirddf = testing_secondf[testing_secondf[Train.NonSmokerCol.__name__] == "0"].reset_index().drop(['index'], axis=1)

    training_thirddf, testing_thirddf = process(training_thirddf, testing_thirddf, apply_text_process, func = Train.SmokerCol)

df = pd.DataFrame(outputs, columns=['TextProcess', 'ngram_size', 'level', 'accuracy', 'precision', 'f1'])
print()
print(df)

Directory Layout

  • The directories and files within the model directory will be deleted each time this script is executed.

Update Config.cfg

Original config.cfg generated by fill_config using base_config.cfg

[training.logger]
@loggers = "spacy.ConsoleLogger.v1"
console_output = false

Manually update to:

[training.logger]
@loggers = "spacy.ConsoleLogger.v3"
progress_bar = "eval"
console_output = true
output_file = "training_log.jsonl"

Collect all log into one file

def update_log(training_log_file, log_file, apply_text_process, ngram_size, seed, func = Train.UnknownCol):
    try:
        with open(training_log_file, 'r') as training_log, open(log_file, 'a') as all_log:
            for line in training_log:
                log_entry = json.loads(line.strip())
                log_entry.update({
                    "apply_text_process": apply_text_process,
                    "ngram_size": ngram_size,
                    "seed": seed,
                    "level": func.__name__
                })
                all_log.write(json.dumps(log_entry) + '\n')
        print(f"Logs appended to {log_file} successfully.")
        os.remove(training_log_file)
    except FileNotFoundError:
        print(f"Training log file {training_log_file} not found. Skipping log appending.")
    except Exception as e:
        print(f"An error occurred while appending logs: {e}")

Training Process Plot

Testing Evaluation

plot 2

plot 3

 







Login to like - 0 Likes



Comments...


No Comments Yet...



Add Comment...




Footer with Icons