try multi-level classification with spaCy
inspired By
https://github.com/falihazikra/Medical-Record-Classifier
Code
import nltk
nltk.download('stopwords')
import spacy
import pandas as pd
import time
import warnings
import Train
from pathlib import Path
import numpy as np
import os
import json
import shutil
from tqdm import tqdm
from spacy.tokens import DocBin
from spacy.cli.init_config import fill_config
from spacy.cli.train import train
from spacy.cli.evaluate import evaluate
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, f1_score
from typing import List, Dict, Tuple
spacy.prefer_gpu()
warnings.filterwarnings('ignore')
CONFIG_DIR = "config"
MODEL_DIR = "model"
DATA_DIR = "./data"
TRAIN_FILE = "smokers_surrogate_train_all_version2.xml"
TEST_FILE = "smokers_surrogate_test_all_groundtruth_version2.xml"
LOG_FILE = "training_log.jsonl"
def cleanup_model_directory(model_path: str) -> None:
shutil.rmtree(model_path, ignore_errors=True)
def convert_data_to_dataframe(record: str, apply_text_process: bool = True) -> pd.DataFrame:
df = Train.GetJsonFromRecords(record)
df = Train.ColTransform(df)
if apply_text_process:
df["descrp"] = df["descrp"].apply(Train.TextProcess)
return df
def apply_col(df: pd.DataFrame, func: callable) -> pd.DataFrame:
df[func.__name__] = df["smoking_status"].apply(func)
return df
def convert_dataframe_to_list(df: pd.DataFrame, func: callable) -> List[Dict[str, str]]:
data_list = []
for i in range(len(df)):
data_dict = {
"text": df['descrp'][i],
"category": df[func.__name__][i]
}
data_list.append(data_dict)
return data_list
def convert_list_to_spacy(data_list: List[Dict[str, str]], spacy_path: str, nlp: spacy.language.Language, db: DocBin) -> None:
categories = ["1", "0"]
for l in data_list:
doc = nlp.make_doc(l["text"])
doc.cats = {category: 0 for category in categories}
doc.cats[l["category"]] = 1
db.add(doc)
db.to_disk(spacy_path)
def create_directory(dir_path: str) -> None:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
def texting_evaluation(test_data: pd.DataFrame, nlp: spacy.language.Language) -> pd.DataFrame:
label = 'category'
for i in tqdm(range(len(test_data))):
text = test_data['text'][i]
if test_data[label][i] == '0':
test_data['true'][i] = 0.0
else:
test_data['true'][i] = 1.0
doc = nlp(text)
predict_cat = max(doc.cats, key=doc.cats.get)
test_data['predict'][i] = {'0': 0.0}.get(predict_cat, 1.0)
test_data['score'][i] = 1 if str(test_data['true'][i]) == str(test_data['predict'][i]) else 0
return test_data
def update_log(training_log_file: str, log_file: str, apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> None:
try:
with open(training_log_file, 'r') as training_log, open(log_file, 'a') as all_log:
for line in training_log:
log_entry = json.loads(line.strip())
log_entry.update({
"apply_text_process": apply_text_process,
"ngram_size": ngram_size,
"seed": seed,
"level": func.__name__
})
all_log.write(json.dumps(log_entry) + '\n')
print(f"Logs appended to {log_file} successfully.")
os.remove(training_log_file)
except FileNotFoundError:
print(f"Training log file {training_log_file} not found. Skipping log appending.")
except Exception as e:
print(f"An error occurred while appending logs: {e}")
def format_evaluation_results(metrics: dict) -> None:
# Extract and format values
token_acc = metrics['token_acc'] * 100
macro_f = metrics['cats_score'] * 100
speed = metrics['speed']
# Extract Textcat F (per label)
textcat_f_per_label = metrics['cats_f_per_type']
formatted_textcat_f = []
for label, values in textcat_f_per_label.items():
p = values['p'] * 100
r = values['r'] * 100
f = values['f'] * 100
formatted_textcat_f.append((label, p, r, f))
# Extract Textcat ROC AUC (per label)
roc_auc_per_label = metrics['cats_auc_per_type']
formatted_roc_auc = [(label, auc) for label, auc in roc_auc_per_label.items()]
# Print results
print("\nℹ Using CPU\n")
print("================================== Results ==================================\n")
print(f"TOK {token_acc:.2f}")
print(f"TEXTCAT (macro F) {macro_f:.2f}")
print(f"SPEED {speed:.0f}\n")
print("=========================== Textcat F (per label) ===========================\n")
print(f"{'':<12} {'P':<8}{'R':<8}{'F':<8}")
for label, p, r, f in formatted_textcat_f:
print(f"{label:<12} {p:.2f} {r:.2f} {f:.2f}")
print("\n======================== Textcat ROC AUC (per label) ========================\n")
print(f"{'':<12} {'ROC AUC'}")
for label, auc in formatted_roc_auc:
print(f"{label:<12} {auc:.2f}")
print("\n✔ Saved results to en/efficiency/metrics_en.json")
def prepare_training_paths(apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> Tuple[str, str, str, str]:
train_spacy_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "train.spacy")
dev_spacy_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "dev.spacy")
test_spacy_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "test.spacy")
metrics_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}", "metrics.json")
return train_spacy_path, dev_spacy_path, test_spacy_path, metrics_path
def prepare_directories(apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> Tuple[str, str, str]:
log_dir_path = os.path.join(MODEL_DIR, "log")
model_dir_path = os.path.join(MODEL_DIR, f"{apply_text_process}_{ngram_size}_{seed}_{func.__name__}")
model_best_path = os.path.join(model_dir_path, "model-best")
create_directory(log_dir_path)
create_directory(model_dir_path)
return log_dir_path, model_dir_path, model_best_path
def process_training_data(training_list: List[Dict[str, str]], seed: int, train_spacy_path: str, dev_spacy_path: str, test_spacy_path: str, nlp: spacy.language.Language) -> None:
Train_list, Dev_list = train_test_split(training_list, test_size = 0.1, random_state = seed)
Test_list = Dev_list[int(len(Dev_list)/2):]
Dev_list = Dev_list[:int(len(Dev_list)/2)]
db = DocBin()
convert_list_to_spacy(Train_list, train_spacy_path, nlp, db)
convert_list_to_spacy(Dev_list, dev_spacy_path, nlp, db)
convert_list_to_spacy(Test_list, test_spacy_path, nlp, db)
def train_and_dev_evaluate(training_list: List[Dict[str, str]], apply_text_process: bool, ngram_size: int, seed: int, func: callable) -> spacy.language.Language:
train_spacy_path, dev_spacy_path, test_spacy_path, metrics_path = prepare_training_paths(apply_text_process, ngram_size, seed, func)
log_dir_path, model_dir_path, model_best_path = prepare_directories(apply_text_process, ngram_size, seed, func)
nlp = spacy.blank("en")
process_training_data(training_list, seed, train_spacy_path, dev_spacy_path, test_spacy_path, nlp)
# fill config
base_config_path = os.path.join(CONFIG_DIR, "base_config.cfg")
config_path = os.path.join(CONFIG_DIR, "config.cfg")
if not os.path.isfile(config_path):
fill_config(output_file = Path(config_path),
base_path = base_config_path)
start = time.time()
# training
train(config_path = config_path,
output_path = model_dir_path,
overrides = {"paths.train": train_spacy_path,
"paths.dev": dev_spacy_path,
"components.textcat.model.ngram_size": ngram_size},
# use_gpu=0
)
# training log
all_log_path = os.path.join(log_dir_path, "training_all_log.json")
update_log(LOG_FILE, all_log_path, apply_text_process, ngram_size, seed, func)
# evalution
metrics_output = evaluate(
model = model_best_path,
output = metrics_path,
data_path = test_spacy_path
)
print(format_evaluation_results(metrics_output))
# load model
nlp = spacy.load(model_best_path)
print("TRAINING TIME: ", time.time() - start)
return nlp
def testing_evaluate(testing: List[Dict[str, str]], nlp: spacy.language.Language) -> pd.DataFrame:
testing_df = pd.DataFrame(testing)
testing_df['true'] = ''
testing_df['predict'] = ''
testing_df['score'] = ''
tested_data = texting_evaluation(testing_df, nlp)
return tested_data
def evaluate_metrics(tested_data: pd.DataFrame) -> Tuple[float, float, float]:
true_values = list(tested_data['true'].values)
predict_values = list(tested_data['predict'].values)
accuracy = accuracy_score(true_values, predict_values)
precision = precision_score(true_values, predict_values, average='weighted')
f1 = f1_score(true_values, predict_values, average='weighted')
return accuracy, precision, f1
def process(train_df: pd.DataFrame, test_df: pd.DataFrame, apply_text_process: bool, func: callable) -> Tuple[pd.DataFrame, pd.DataFrame]:
train_df = apply_col(train_df, func)
test_df = apply_col(test_df, func)
training_list = convert_dataframe_to_list(train_df, func)
testing_list = convert_dataframe_to_list(test_df, func)
for ngram_size in ngram_sizes:
accuracy_scores = []
precision_scores = []
f1_scores = []
for seed in seeds:
print(f"Testing with TextProcess={apply_text_process} and ngram_size={ngram_size}")
nlp = train_and_dev_evaluate(training_list, apply_text_process, ngram_size, seed, func)
tested_data = testing_evaluate(testing_list, nlp)
accuracy, precision, f1 = evaluate_metrics(tested_data)
accuracy_scores.append(accuracy)
precision_scores.append(precision)
f1_scores.append(f1)
# cross validation
avg_accuracy = np.mean(accuracy_scores)
avg_precision = np.mean(precision_scores)
avg_f1 = np.mean(f1_scores)
outputs.append((apply_text_process, ngram_size, func.__name__, avg_accuracy, avg_precision, avg_f1))
return train_df, test_df
ngram_sizes = [1, 2, 3]
seeds = [0, 1]
outputs = []
cleanup_model_directory(MODEL_DIR)
for apply_text_process in [False, True]:
training_df = convert_data_to_dataframe("./data/smokers_surrogate_train_all_version2.xml", apply_text_process)
testing_df = convert_data_to_dataframe("./data/smokers_surrogate_test_all_groundtruth_version2.xml", apply_text_process)
training_df, testing_df = process(training_df, testing_df, apply_text_process, func = Train.UnknownCol)
training_secondf = training_df[training_df[Train.UnknownCol.__name__] == "0"].reset_index().drop(['index'], axis=1)
testing_secondf = testing_df[testing_df[Train.UnknownCol.__name__] == "0"].reset_index().drop(['index'], axis=1)
training_secondf, testing_secondf = process(training_secondf, testing_secondf, apply_text_process, func = Train.NonSmokerCol)
training_thirddf = training_secondf[training_secondf[Train.NonSmokerCol.__name__] == "0"].reset_index().drop(['index'], axis=1)
testing_thirddf = testing_secondf[testing_secondf[Train.NonSmokerCol.__name__] == "0"].reset_index().drop(['index'], axis=1)
training_thirddf, testing_thirddf = process(training_thirddf, testing_thirddf, apply_text_process, func = Train.SmokerCol)
df = pd.DataFrame(outputs, columns=['TextProcess', 'ngram_size', 'level', 'accuracy', 'precision', 'f1'])
print()
print(df)
Directory Layout
- The directories and files within the model directory will be deleted each time this script is executed.
Update Config.cfg
Original config.cfg generated by fill_config using base_config.cfg
[training.logger]
@loggers = "spacy.ConsoleLogger.v1"
console_output = false
Manually update to:
[training.logger]
@loggers = "spacy.ConsoleLogger.v3"
progress_bar = "eval"
console_output = true
output_file = "training_log.jsonl"
Collect all log into one file
def update_log(training_log_file, log_file, apply_text_process, ngram_size, seed, func = Train.UnknownCol):
try:
with open(training_log_file, 'r') as training_log, open(log_file, 'a') as all_log:
for line in training_log:
log_entry = json.loads(line.strip())
log_entry.update({
"apply_text_process": apply_text_process,
"ngram_size": ngram_size,
"seed": seed,
"level": func.__name__
})
all_log.write(json.dumps(log_entry) + '\n')
print(f"Logs appended to {log_file} successfully.")
os.remove(training_log_file)
except FileNotFoundError:
print(f"Training log file {training_log_file} not found. Skipping log appending.")
except Exception as e:
print(f"An error occurred while appending logs: {e}")
Training Process Plot
Testing Evaluation
plot 2
plot 3
Comments...
No Comments Yet...