Back to Library

What is MLOps?

A step-by-step guide for setting up an MLOps pipeline end-to-end using Python.

The field of Machine Learning has many domains for a budding Machine Learning Engineer to dive into. You can program machine learning libraries from scratch, or you can deploy, monitor, and maintain ML models at scale. The latter is frequently called MLOps in job descriptions in 2026. The following guide demonstrates the role of a Machine Learning Engineer specializing in MLOps in a real-world scenario.

Defining the Problem

An emergency room is located in a linguistically diverse area. Hospital staff are having difficulty triaging patients because they are unable to properly identify the needs of the patient. The ML team has developed a new model: a Multinomial Naive Bayes classifier, to read input from patients in their native language and output the correct specialty (Cardiology, Neurology, etc.) that the patient needs to see. The ML team is asking us to handle the end-to-end MLOps pipeline for the triage system.

1 - Project Initialization & Data Preprocessing

The first step is formally called data preprocessing, which is when we 'wrangle & clean' the data, or to put it simply: prepare the data for the ML model. This step might sound less meaningful than it really is, when in reality, the result heavily depends on what is done at this step. The ML model we are using (Multinomial Naive Bayes), like any ML model, is just a mathematical algorithm, a way to process the data computationally. The model we are using is calculating probabilities based on word frequencies. Without data, the model cannot produce any result. The machine learning model will choose its response based on the data that it was trained on. For this guide, the model will be trained on the Symptom2Disease dataset. This dataset contains 24 unique disease labels, each paired with a text description. We will use this data to train the model to predict disease labels for symptoms recorded from emergency room patients. We also need to add two utilities:

A screenshot of a section of the Symptom2Disease dataset.

1.1 - Project Setup

A screenshot of what the project folder structure looks like at initiliazation.

Before we can run any scripts the project must be properly intialized. Create a new python project and .venv in your IDE of choice.

  1. In the project root create the following directories: artifacts, data, and ETL.
  2. Within the data directory, create 2 more folders: processed & raw.
  3. Place Symptom2Disease.csv (link: https://www.kaggle.com/datasets/niyarrbarman/symptom2disease?resource=download) into the raw folder.
  4. In the root of the project create a new file called requirements.txt and place the following Dependencies inside:
# Core ETL Pipeline Dependencies 
pandas>=2.0.0
numpy>=1.24.0

# Machine Learning
scikit-learn>=1.3.0
joblib>=1.3.0

# Translation & NLP
deep-translator>=1.11.4
langdetect>=1.0.9

# Visualization 
matplotlib>=3.7.0
seaborn>=0.12.0

# Web Dashboard 
streamlit>=1.28.0

# Development & Testing
pytest>=7.4.0
  1. Activate your environment (if you haven't already):
  1. In the terminal run: pip install -r requirements.txt

1.2 - Extract

What are we doing?

We are extracting the data from the CSV file and returning it as a Pandas DataFrame. In addition, we are also performing integrity checks by verifying the file exists and there are no missing columns. Then we are using built-in Pandas methods to log data metrics.

Why are we doing this?

Pandas has many built-in tools and features to make data preprocessing much easier. By storing the data in a DataFrame we can wrangle the data quick and easy, without having to write too much code. We can verify that our data structure is valid by checking for the Symptom2Disease 'label' and 'text' columns. We are doing this during the first step of our ETL pipeline, because we need to ensure the data is valid before we transform it. Detailed logging ensures we can see exactly what data is being entered into the pipeline.

The following python script ./ETL/extract.py is used to extract data from the Symptom2Disease.csv CSV file:

(Click on the highlighted line numbers to view annotations)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import pandas as pd
import os
from typing import Tuple
def extract_symptom_data(filepath: str) -> Tuple[pd.DataFrame, dict]:
if not os.pathexists(filepath):
raise FileNotFoundError("File not found: {filepath}")
df = pd.read_csv(filepath, index_col=0)
required_columns = ['label', 'text']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
print(f"Required columns verified: {required_columns}") # print the required columns
# Generate summary statistics
summary = {
'total_rows': len(df),
'total_columns': len(df.columns),
'columns': list(df.columns), # list of the column names
'null_counts': df.isnull().sum().to_dict(), # count the number of null values in each column
'unique_labels': df['label'].nunique() if 'label' in df.columns else 0, # count the number of unique labels in the 'label' column
'data_types': df.dtypes.to_dict()
}
# Log summary to console
print(f"DATA SUMMARY:")
print(f" Total Rows: {summary['total_rows']:,}")
print(f" Total Columns: {summary['total_columns']}")
print(f" Unique Medical Specialties: {summary['unique_labels']}")
return df, summary

1.3 - Transform

What are we doing?

In the extract phase we logged how many null values were in our extacted DataFrame, but we did not do anything to clean the data. In the transform phase we are removing null values and duplicate values. Then we are fitting the cleaned data to our TF-IDF Vectorizer.

Why are we doing this?

We need to prepare the raw symptom text data into a format that machine learning models can actually use. ML models can't work with text directly - they need numbers. Additionally, we need to ensure data quality before training begins.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import pandas as pd
import re
from typing import Tuple
from sklearn.feature_extraction.text import TfidfVectorizer
def clean_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
initial_rows = len(df)
log = {
'initial_rows': initial_rows,
'null_rows_removed': 0,
'duplicate_rows_removed': 0,
'final_rows': 0
}
# Check for null values in required columns
null_count_before = df[['text', 'label']].isna().to_numpy().sum()
print(f"Initial null values in 'text' and 'label': {null_count_before}")
# Drop rows with null values in text or label columns
df_cleaned = df.dropna(subset=['text', 'label'])
rows_after_null_removal = len(df_cleaned)
log['null_rows_removed'] = initial_rows - rows_after_null_removal
if log['null_rows_removed'] > 0:
print(f" Removed {log['null_rows_removed']} rows with null values")
else:
print(f" No null values found")
# Remove duplicate symptom entries
duplicates_before = df_cleaned.duplicated(subset=['text']).sum()
print(f"Duplicate symptom entries found: {duplicates_before}")
df_cleaned = df_cleaned.drop_duplicates(subset=['text'], keep='first')
rows_after_duplicate_removal = len(df_cleaned)
log['duplicate_rows_removed'] = rows_after_null_removal - rows_after_duplicate_removal
if log['duplicate_rows_removed'] > 0:
print(f" Removed {log['duplicate_rows_removed']} duplicate rows")
else:
print(f" No duplicates found")
log['final_rows'] = rows_after_duplicate_removal
# Reset index
df_cleaned = df_cleaned.reset_index(drop=True)
print(f"CLEANING SUMMARY:")
print(f" Initial rows: {log['initial_rows']:,}")
print(f" Rows removed (nulls): {log['null_rows_removed']:,}")
print(f" Rows removed (duplicates): {log['duplicate_rows_removed']:,}")
print(f" Final rows: {log['final_rows']:,}")
print(f" Retention rate: {(log['final_rows'] / log['initial_rows'] * 100):.2f}%")
return df_cleaned, log
def preprocess_text(text: str) -> str:
# Convert to lowercase
text = text.lower()
# Strip leading/trailing whitespace
text = text.strip()
# Remove any character not in this allowlist: letters a–z, digits 0–9, spaces, and . , ! ? ' - (regex: r'[^a-z0-9\s.,!?\'\-]')
text = re.sub(r'[^a-z0-9\s.,!?\'\-]', '', text)
# Remove multiple spaces
text = re.sub(r'\s+', ' ', text)
return text
def transform_symptom_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, TfidfVectorizer, dict]:
# Clean data
df_cleaned, cleaning_log = clean_data(df)
# Preprocess text
df_cleaned['text'] = df_cleaned['text'].apply(preprocess_text)
# Feature preparation with TF-IDF
print(" Initializing TF-IDF Vectorizer...")
vectorizer = TfidfVectorizer(
max_features=5000,
stop_words='english',
ngram_range=(1, 2),
min_df=2,
max_df=0.95
)
# Fit the vectorizer (don't transform yet, just prepare it)
print(" Fitting TF-IDF vectorizer on cleaned text...")
vectorizer.fit(df_cleaned['text'])
vocabulary_size = len(vectorizer.vocabulary_)
print(f" TF-IDF vectorizer fitted")
print(f" Vocabulary size: {vocabulary_size:,} terms")
print(f" N-gram range: {vectorizer.ngram_range}")
print(f" Max features: {vectorizer.max_features}")
# Create transformation log
transform_log = {
**cleaning_log,
'vocabulary_size': vocabulary_size,
'unique_labels': df_cleaned['label'].nunique(),
'label_distribution': df_cleaned['label'].value_counts().to_dict(),
'translations_performed': 0
}
print(f"Transform phase completed successfully")
return df_cleaned, vectorizer, transform_log

1.4 - Load

What are we doing?

This is the Load phase of the ETL pipeline - ensuring the persistance of the processed data and logging the pipeline execution.

Why are we doing this?

We are ensuring both the data itself and the pipeline metadata about processing are preserved for production-ready monitoring and reproducibility.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import pandas as pd
import sqlite3
import os
from datetime import datetime
from typing import Dict
def save_to_csv(df: pd.DataFrame, output_path: str) -> None:
output_dir = os.path.dirname(output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f" Created directory: {output_dir}")
# Save to CSV
print(f"Saving cleaned data to: {output_path}")
df.to_csv(output_path, index=False)
file_size = os.path.getsize(output_path)
file_size_mb = file_size / (1024 * 1024)
def save_to_database(df: pd.DataFrame, transform_log: Dict, db_path: str = "data/processed/etl_logs.db") -> None:
# Ensure database directory exists
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
print(f"Created directory: {db_dir}")
# Connect to database
print(f"Connecting to database: {db_path}")
conn = sqlite3.connect(db_path)
# Create table if it doesn't exist
conn.execute("""
CREATE TABLE IF NOT EXISTS processed_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_timestamp TEXT NOT NULL,
medical_specialty TEXT NOT NULL,
record_count INTEGER NOT NULL,
percentage REAL NOT NULL,
UNIQUE(run_timestamp, medical_specialty)
)
""")
# Summary by medical specialty
specialty_counts = df['label'].value_counts()
total_records = len(df)
timestamp = datetime.now().isoformat()
# SQL injection prevention: Parameterized queries with placeholders (?) instead of string concatenation
# Idempotent inserts via ON CONFLICT
for specialty, count in specialty_counts.items():
percentage = (count / total_records) * 100
conn.execute("""
INSERT INTO processed_logs (run_timestamp, medical_specialty, record_count, percentage)
VALUES (?, ?, ?, ?)
ON CONFLICT(run_timestamp, medical_specialty) DO UPDATE SET
record_count = excluded.record_count,
percentage = excluded.percentage
""", (timestamp, specialty, int(count), float(percentage)))
conn.commit()
print("Table 'processed_logs' ready")
print(f"SPECIALTY DISTRIBUTION:")
for specialty, count in specialty_counts.items():
percentage = (count / total_records) * 100
print(f" {specialty}: {count:,} records ({percentage:.2f}%)")
total_db_rows = conn.execute("SELECT COUNT(*) FROM processed_logs").fetchone()[0]
print(f"Database Summary:")
print(f" Records inserted/updated this run: {len(specialty_counts)}")
print(f" Timestamp: {timestamp}")
print(f" Total database rows: {total_db_rows}")
conn.close()
print(f" Database persistence completed")
def load_processed_data(df: pd.DataFrame, transform_log: Dict,
csv_path: str = "data/processed/cleaned_symptoms.csv",
db_path: str = "data/processed/etl_logs.db") -> None:
# Save to CSV
save_to_csv(df, csv_path)
# Save to database
save_to_database(df, transform_log, db_path)
print(" LOAD PHASE COMPLETED SUCCESSFULLY")

2 - Model Training & Evaluation

We have successfully completed our ETL pipeline and we are ready to begin the next step: Model Training. Our data is officially ready. The load phase saved cleaned_symptoms.csv which is the input for training the model. We will create a new folder in the root of our project directory called 'models'. Within this folder create 2 files: train.py and evaluate.py.

A screenshot of what the project folder structure looks like during the Model Training phase.

2.1 - Train

What are we doing?

We are training a machine learning model to predict medical conditions from symptom descriptions.

Why are we doing this?

Main Purpose: Create a reusable trained model that can predict diseases from new symptom text without retraining.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import os
import sys
import json
import pandas as pd
import numpy as np
import joblib
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from typing import Tuple, Dict
# Consistent split parameters (used by both train and evaluate)
TEST_SIZE = 0.2
RANDOM_STATE = 42
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
def load_cleaned_data(filepath: str = "data/processed/cleaned_symptoms.csv") -> Tuple[pd.DataFrame, Dict]:
if not os.path.exists(filepath):
raise FileNotFoundError(
f"Cleaned data not found at {filepath}. "
"Please run the ETL pipeline first (python3 src/main.py)"
)
print(f"Loading cleaned data from: {filepath}")
df = pd.read_csv(filepath)
metadata = {
'total_records': len(df),
'unique_specialties': df['label'].nunique(),
'specialty_counts': df['label'].value_counts().to_dict(),
'avg_text_length': df['text'].str.len().mean()
}
print(f"Loaded {metadata['total_records']:,} records across {metadata['unique_specialties']} specialties.")
return df, metadata
def create_train_test_split(
df: pd.DataFrame,
test_size: float = TEST_SIZE,
random_state: int = RANDOM_STATE
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, list]:
X = df['text']
y = df['label']
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y # Maintain class distribution in both sets
)
# Save test indices for reproducible evaluation
test_indices = X_test.index.tolist()
print(f"Split: train={len(X_train):,}, test={len(X_test):,} (stratified, random_state={random_state})")
return X_train, X_test, y_train, y_test, test_indices
def save_split_indices(
test_indices: list,
output_path: str = "artifacts/reports/test_indices.json",
test_size: float = TEST_SIZE,
random_state: int = RANDOM_STATE
) -> str:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
indices_data = {
"test_indices": test_indices,
"meta": {
"test_size": test_size,
"random_state": random_state,
"timestamp": datetime.now().isoformat()
}
}
with open(output_path, "w") as f:
json.dump(indices_data, f, indent=2)
print(f"Saved test indices to: {output_path}")
return output_path
def build_model_pipeline() -> Pipeline:
# TF-IDF Vectorizer configuration
tfidf = TfidfVectorizer(
max_features=5000,
stop_words='english',
ngram_range=(1, 2), # Unigrams and bigrams
min_df=2, # Ignore terms appearing in < 2 documents
max_df=0.95, # Ignore terms appearing in > 95% of documents
sublinear_tf=True # Use log scaling for term frequency
)
# Multinomial Naive Bayes classifier
# Alpha = 1.0 (Laplace smoothing to handle zero probabilities)
nb_classifier = MultinomialNB(alpha=1.0)
# Create pipeline
pipeline = Pipeline([
('tfidf', tfidf),
('classifier', nb_classifier)
])
print("Pipeline: TF-IDF + MultinomialNB")
return pipeline
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Tuple[Pipeline, Dict]:
print("Training model...")
start_time = datetime.now()
# Fit the pipeline
pipeline.fit(X_train, y_train)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# Extract feature names and vocabulary size
tfidf_vectorizer = pipeline.named_steps['tfidf']
vocabulary_size = len(tfidf_vectorizer.vocabulary_)
metadata = {
'training_samples': len(X_train),
'unique_classes': y_train.nunique(),
'vocabulary_size': vocabulary_size,
'training_duration_seconds': duration,
'timestamp': datetime.now().isoformat()
}
print(f"Trained in {duration:.2f}s | samples={metadata['training_samples']:,} | vocab={metadata['vocabulary_size']:,}")
return pipeline, metadata
def save_model(
pipeline: Pipeline,
output_path: str = "artifacts/medical_model.joblib"
) -> None:
# Ensure output directory exists
output_dir = os.path.dirname(output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Created directory: {output_dir}")
# Save model using joblib
print(f"Saving model to: {output_path}")
joblib.dump(pipeline, output_path)
# Verify save and get file size
file_size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f"Saved ({file_size_mb:.2f} MB)")
def run_training_pipeline(
input_csv: str = "data/processed/cleaned_symptoms.csv",
output_model: str = "artifacts/medical_model.joblib",
indices_path: str = "artifacts/reports/test_indices.json",
test_size: float = TEST_SIZE,
random_state: int = RANDOM_STATE
) -> Dict:
print("\n" + "="*70)
print("MODEL TRAINING PIPELINE")
print("="*70)
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
try:
# Step 1: Load data
df, data_metadata = load_cleaned_data(input_csv)
# Step 2: Create train/test split
X_train, X_test, y_train, y_test, test_indices = create_train_test_split(
df, test_size=test_size, random_state=random_state
)
# Step 3: Save test indices for reproducible evaluation
save_split_indices(test_indices, indices_path, test_size, random_state)
# Step 4: Build pipeline
pipeline = build_model_pipeline()
# Step 5: Train model
trained_pipeline, train_metadata = train_model(pipeline, X_train, y_train)
# Step 6: Save model
save_model(trained_pipeline, output_model)
# Compile summary
summary = {
'status': 'SUCCESS',
'data': data_metadata,
'training': train_metadata,
'split': {
'train_size': len(X_train),
'test_size': len(X_test),
'test_ratio': test_size,
'random_state': random_state
},
'model_path': output_model,
'indices_path': indices_path,
'test_data': {'X_test': X_test, 'y_test': y_test, 'X_train': X_train, 'y_train': y_train}
}
print("\n" + "="*70)
print("[OK] TRAINING COMPLETE")
print("="*70)
print(f" Model saved: {output_model}")
print(f" Test indicnes: {indices_path}")
print(f" Train samples: {len(X_train):,}")
print(f" Test samples: {len(X_test):,}")
print("="*70 + "\n")
return summary
except Exception as e:
print(f"\n[ERROR] Training Error: {str(e)}")
raise
if __name__ == "__main__":
summary = run_training_pipeline()
print(f"Summary: {summary['data']['unique_specialties']} classes | vocab={summary['training']['vocabulary_size']:,}")

2.2 - Evaluate

What are we doing?

We are evaluating the trained model's performance on the held-out test data (20% from the split).

We will:

  1. Load the trained model from disk
  2. Load test data using saved indices (ensures same samples from training)
  3. Generate predictions on test set
  4. Calculate performance metrics (accuracy, precision, recall, F1-score)
  5. Test hypothesis (did we achieve ≥85% accuracy target?)
  6. Generate per-disease classification report
  7. Create visualizations (confusion matrix, class distribution, top keywords)

Why are we doing this?

We want to prove the model works on unseen data - training metrics alone can be misleading (overfitting). We are also testing a hypothesis that the model will work with at least 85% accuracy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import os
import sys
import json
import pandas as pd
import numpy as np
import joblib
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
from typing import Dict, Tuple
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from visuals.plots import save_confusion_matrix, save_class_distribution, save_top_keywords
# Consistent split parameters (same as train.py)
TEST_SIZE = 0.2
RANDOM_STATE = 42
def load_model(model_path: str = "artifacts/medical_model.joblib"):
if not os.path.exists(model_path):
raise FileNotFoundError(
f"Model not found at {model_path}. "
"Please run training first (python3 src/models/train.py)"
)
print(f"Loading model from: {model_path}")
pipeline = joblib.load(model_path)
print(f"[OK] Model loaded successfully")
print(f" Pipeline steps: {list(pipeline.named_steps.keys())}")
return pipeline
def load_data_and_split(
csv_path: str = "data/processed/cleaned_symptoms.csv",
indices_path: str = "artifacts/reports/test_indices.json",
test_size: float = TEST_SIZE,
random_state: int = RANDOM_STATE
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
print(f"Loading data from: {csv_path}")
df = pd.read_csv(csv_path)
X = df["text"]
y = df["label"]
# Try to use saved indices for exact reproducibility
if os.path.exists(indices_path):
print(f"Loading saved test indices from: {indices_path}")
with open(indices_path) as f:
indices_data = json.load(f)
test_idx = indices_data["test_indices"]
# Use saved indices
test_mask = X.index.isin(test_idx)
X_test = X[test_mask]
y_test = y[test_mask]
X_train = X[~test_mask]
y_train = y[~test_mask]
print(f"[OK] Using saved indices: train={len(X_train):,}, test={len(X_test):,}")
else:
# Recreate split with same parameters
print(f"[WARN] No saved indices found, recreating split (random_state={random_state})")
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
print(f"[OK] Split recreated: train={len(X_train):,}, test={len(X_test):,}")
return X_train, X_test, y_train, y_test
def generate_predictions(
pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Tuple[np.ndarray, Dict]:
# Predict
y_pred = pipeline.predict(X_test)
# Accuracy evaluation: precision, recall, F1-score, overall accuracy metrics
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)
metrics = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'test_samples': len(X_test)
}
return y_pred, metrics
def evaluate_hypothesis(accuracy: float, target: float = 0.85) -> Dict:
difference = accuracy - target
percentage_diff = (difference / target) * 100
hypothesis_met = accuracy >= target
result = {
'target_accuracy': target,
'achieved_accuracy': accuracy,
'difference': difference,
'percentage_difference': percentage_diff,
'hypothesis_met': hypothesis_met,
'conclusion': 'ACCEPTED' if hypothesis_met else 'REJECTED'
}
return result
def generate_classification_report(
y_test: pd.Series,
y_pred: np.ndarray,
output_dir: str = "artifacts/reports"
) -> pd.DataFrame:
# Generate report
report_dict = classification_report(y_test, y_pred, output_dict=True, zero_division=0)
report_df = pd.DataFrame(report_dict).transpose()
# Save to CSV
os.makedirs(output_dir, exist_ok=True)
report_path = os.path.join(output_dir, "classification_report.csv")
report_df.to_csv(report_path)
print(f"Classification report saved: {report_path}")
return report_df
def evaluate_model(
pipeline,
X_train: pd.Series,
X_test: pd.Series,
y_train: pd.Series,
y_test: pd.Series,
target_accuracy: float = 0.85,
vis_dir: str = "artifacts/visualizations",
report_dir: str = "artifacts/reports"
) -> Dict:
print("\nGenerating predictions and metrics...")
# Generate predictions and metrics
y_pred, metrics = generate_predictions(pipeline, X_test, y_test)
# Hypothesis testing
hypothesis_result = evaluate_hypothesis(metrics['accuracy'], target_accuracy)
# Classification report
report_df = generate_classification_report(y_test, y_pred, report_dir)
# Generate all visualizations
print("\nGenerating visualizations...")
labels = sorted(y_test.unique())
cm_path = save_confusion_matrix(
y_true=y_test,
y_pred=y_pred,
labels=labels,
output_path=os.path.join(vis_dir, "confusion_matrix.png")
)
print(f" [OK] Confusion Matrix: {cm_path}")
dist_path = save_class_distribution(
y_train=y_train,
y_test=y_test,
output_path=os.path.join(vis_dir, "class_distribution.png")
)
print(f" [OK] Class Distribution: {dist_path}")
keywords_path = save_top_keywords(
pipeline=pipeline,
output_path=os.path.join(vis_dir, "top_keywords.png")
)
print(f" [OK] Top Keywords: {keywords_path}")
# Compile results
results = {
'metrics': metrics,
'hypothesis_test': hypothesis_result,
'classification_report': report_df,
'visualizations': {
'confusion_matrix': cm_path,
'class_distribution': dist_path,
'top_keywords': keywords_path
}
}
return results
def run_evaluation_pipeline(
model_path: str = "artifacts/medical_model.joblib",
data_csv: str = "data/processed/cleaned_symptoms.csv",
indices_path: str = "artifacts/reports/test_indices.json",
vis_dir: str = "artifacts/visualizations",
report_dir: str = "artifacts/reports",
target_accuracy: float = 0.85
) -> Dict:
print("\n" + "="*70)
print("MODEL EVALUATION PIPELINE")
print("="*70)
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
try:
# Step 1: Load model
pipeline = load_model(model_path)
# Step 2: Load data and recreate split
X_train, X_test, y_train, y_test = load_data_and_split(
csv_path=data_csv,
indices_path=indices_path
)
# Step 3: Run full evaluation
results = evaluate_model(
pipeline=pipeline,
X_train=X_train,
X_test=X_test,
y_train=y_train,
y_test=y_test,
target_accuracy=target_accuracy,
vis_dir=vis_dir,
report_dir=report_dir
)
# Print summary
print("\n" + "="*70)
print("[OK] EVALUATION COMPLETE")
print("="*70)
print(f"\nMODEL PERFORMANCE:")
print(f" Accuracy: {results['metrics']['accuracy']*100:.2f}%")
print(f" Precision: {results['metrics']['precision']*100:.2f}%")
print(f" Recall: {results['metrics']['recall']*100:.2f}%")
print(f" F1-Score: {results['metrics']['f1_score']*100:.2f}%")
print(f"\nHYPOTHESIS TEST:")
print(f" Target: {results['hypothesis_test']['target_accuracy']*100:.0f}%")
print(f" Achieved: {results['hypothesis_test']['achieved_accuracy']*100:.2f}%")
print(f" Result: {results['hypothesis_test']['conclusion']}")
print(f"\nDELIVERABLES:")
print(f" [OK] Classification Report: {report_dir}/classification_report.csv")
print(f" [OK] Confusion Matrix: {vis_dir}/confusion_matrix.png")
print(f" [OK] Class Distribution: {vis_dir}/class_distribution.png")
print(f" [OK] Top Keywords: {vis_dir}/top_keywords.png")
print("="*70 + "\n")
return {'status': 'SUCCESS', 'results': results}
except Exception as e:
print(f"\n[ERROR] Evaluation Error: {str(e)}")
raise
if __name__ == "__main__":
run_evaluation_pipeline()

3 - Visualizations and UI

We have successfully trained & evaluated our model. Now let's create some visualizations using Matplotlib and a simple user interface to interact with our trained model.

3.1 - Visualizations Using Matplotlib

What are we doing?

This file creates 3 key visualizations for model evaluation and documentation.

Why are we doing this?

We are demonstrating how Matplotlib, Pandas, and Scikit-Learn all work together to create highly detailed visualizations.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
from typing import Sequence
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
def save_confusion_matrix(y_true: pd.Series,
y_pred: np.ndarray,
labels: Sequence[str],
output_path: str) -> str:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
cm = confusion_matrix(y_true, y_pred, labels=labels)
plt.figure(figsize=(16, 14))
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap="Blues",
xticklabels=labels,
yticklabels=labels,
cbar_kws={"label": "Number of Predictions"},
linewidths=0.5,
linecolor="gray",
)
plt.title("Confusion Matrix: Disease/Diagnosis Predictions", fontsize=16, fontweight="bold", pad=20)
plt.xlabel("Predicted Disease/Diagnosis", fontsize=12, fontweight="bold")
plt.ylabel("True Disease/Diagnosis", fontsize=12, fontweight="bold")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def save_class_distribution(y_train: pd.Series,
y_test: pd.Series,
output_path: str) -> str:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
train_counts = y_train.value_counts().sort_index()
test_counts = y_test.value_counts().sort_index()
df_plot = pd.DataFrame({"Training Set": train_counts, "Test Set": test_counts})
fig, ax = plt.subplots(figsize=(14, 8))
df_plot.plot(kind="bar", ax=ax, width=0.8, color=["#3498db", "#e74c3c"])
plt.title("Class Distribution: Training vs Test Set", fontsize=16, fontweight="bold", pad=20)
plt.xlabel("Disease/Diagnosis", fontsize=12, fontweight="bold")
plt.ylabel("Number of Samples", fontsize=12, fontweight="bold")
plt.legend(title="Dataset", fontsize=10, title_fontsize=11)
plt.xticks(rotation=45, ha="right")
plt.grid(axis="y", alpha=0.3, linestyle="--")
plt.tight_layout()
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def save_top_keywords(pipeline,
output_path: str,
n_keywords: int = 10,
n_specialties: int = 6) -> str:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
tfidf = pipeline.named_steps["tfidf"]
classifier = pipeline.named_steps["classifier"]
feature_names = np.array(tfidf.get_feature_names_out())
class_labels = classifier.classes_
feature_log_prob = classifier.feature_log_prob_
n_specialties = min(n_specialties, len(class_labels))
selected_indices = np.linspace(0, len(class_labels) - 1, n_specialties, dtype=int)
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.ravel()
for idx, class_idx in enumerate(selected_indices):
specialty = class_labels[class_idx]
top_indices = np.argsort(feature_log_prob[class_idx])[-n_keywords:][::-1]
top_features = feature_names[top_indices]
top_scores = feature_log_prob[class_idx][top_indices]
top_scores_norm = (top_scores - top_scores.min()) / (top_scores.max() - top_scores.min() + 1e-10)
axes[idx].barh(range(n_keywords), top_scores_norm, color="#2ecc71")
axes[idx].set_yticks(range(n_keywords))
axes[idx].set_yticklabels(top_features)
axes[idx].invert_yaxis()
axes[idx].set_xlabel("Relative Importance", fontsize=9)
axes[idx].set_title(f"{specialty}", fontsize=11, fontweight="bold")
axes[idx].grid(axis="x", alpha=0.3)
plt.suptitle("Top 10 Predictive Keywords by Disease/Diagnosis", fontsize=16, fontweight="bold", y=0.995)
plt.tight_layout()
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path

3.2 - Building a UI

What are we doing?

Building a simple user interface to interact with our model.

Why are we doing this?

Testing how our trained model works in a live environment.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import sys
import joblib
import streamlit as st
# Ensure project root is on sys.path so 'src' package resolves when run via Streamlit
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
from src.models.mapping import suggest_professional
from src.ui.utils import clean_text, to_english
MODEL_PATH = "artifacts/medical_model.joblib"
VIS_DIR = "artifacts/visualizations"
@st.cache_resource
def load_model():
if not os.path.exists(MODEL_PATH):
raise FileNotFoundError(f"Trained model not found at {MODEL_PATH}. Run training first.")
return joblib.load(MODEL_PATH)
def main():
st.set_page_config(page_title="Medical Triage Dashboard", layout="wide")
st.title("Medical Triage Dashboard")
# User Safety Disclaimer
st.warning(
"**DISCLAIMER:** This product is a Support System and prototype, "
"not a replacement for professional medical judgment. "
"Always consult with a qualified healthcare professional for medical advice, diagnosis, or treatment."
)
# Layout: left inputs, right visuals
col_left, col_right = st.columns([1, 1])
# Interactive query system: Users input symptoms, system responds with real-time predictions
with col_left:
st.subheader("Enter your symptoms")
user_text = st.text_area(
"Describe your symptoms (any language):",
height=160,
placeholder="e.g., I have chest pain and shortness of breath...",
)
predict_btn = st.button("Suggest Specialist")
# Prediction flow - placed directly below input for better UX
if predict_btn:
if not user_text.strip():
st.warning("Please enter your symptoms.")
else:
# Translate to English if needed
translated_text, lang = to_english(user_text.strip())
cleaned = clean_text(translated_text)
try:
pipeline = load_model()
except Exception as e:
st.error(f"Error loading model: {e}")
return
# Decision support: System predicts condition and recommends appropriate medical specialist
# Predict
try:
pred_label = pipeline.predict([cleaned])[0]
proba = None
if hasattr(pipeline, "predict_proba"):
proba = max(pipeline.predict_proba([cleaned])[0])
except Exception as e:
st.error(f"Prediction failed: {e}")
return
specialist = suggest_professional(pred_label)
st.subheader("Suggested Care")
st.write(f"Detected Input Language: {lang.upper()}")
st.write(f"Predicted Condition/Specialty: {pred_label}")
if proba is not None:
st.write(f"Confidence: {proba * 100:.2f}%")
st.success(f"Suggested Medical Professional: {specialist}")
with st.expander("Processed Text (English)"):
st.code(cleaned)
# Dashboard displays three visualization types:
# 1. Confusion Matrix (heatmap) - model prediction accuracy
# 2. Class Distribution (bar chart) - training vs test data balance
# 3. Top Keywords (horizontal bar charts) - feature importance by specialty
with col_right:
st.subheader("Model Visualizations")
cm_path = os.path.join(VIS_DIR, "confusion_matrix.png")
dist_path = os.path.join(VIS_DIR, "class_distribution.png")
keywords_path = os.path.join(VIS_DIR, "top_keywords.png")
# Display visuals if available
if os.path.exists(cm_path):
st.image(cm_path, caption="Confusion Matrix", width='stretch')
else:
st.info("Confusion Matrix not found. Generate via evaluation pipeline.")
if os.path.exists(dist_path):
st.image(dist_path, caption="Class Distribution", width='stretch')
else:
st.info("Class Distribution not found. Generate via evaluation pipeline.")
if os.path.exists(keywords_path):
st.image(keywords_path, caption="Top Predictive Keywords", width='stretch')
else:
st.info("Top Predictive Keywords not found. Generate via evaluation pipeline.")

4 - Translation & Triaging

Arguably, the most important part of our application. Using the tool we built to help patients by:

  1. Mapping the diseases to a specialist.
  2. Translating patient input from any language into English so that our model can interpret it.

4.1 - Mapping Diagnosis to Speciality

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"""
Mapping utilities to suggest a medical professional from a predicted label.
"""
from typing import Dict
LABEL_TO_SPECIALIST: Dict[str, str] = {
"Psoriasis": "Dermatologist",
"Varicose Veins": "Vascular Surgeon",
"peptic ulcer disease": "Gastroenterologist",
"drug reaction": "Allergist/Immunologist",
"allergy": "Allergist/Immunologist",
"urinary tract infection": "Urologist",
"Hypertension": "Cardiologist",
"diabetes": "Endocrinologist",
"Fungal infection": "Dermatologist",
"Dengue": "Infectious Disease Specialist",
"Impetigo": "Dermatologist",
"Typhoid": "Infectious Disease Specialist",
"Common Cold": "Primary Care Physician",
"Cervical spondylosis": "Orthopedic Specialist",
"Chicken pox": "Primary Care Physician",
"Bronchial Asthma": "Pulmonologist",
"gastroesophageal reflux disease": "Gastroenterologist",
"Pneumonia": "Pulmonologist",
"Migraine": "Neurologist",
"Arthritis": "Rheumatologist",
"Acne": "Dermatologist",
"Malaria": "Infectious Disease Specialist",
"Dimorphic Hemorrhoids": "Colorectal Surgeon",
"Jaundice": "Hepatologist",
}

4.2 - Translation Utilities

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import re
from typing import Tuple
from langdetect import detect, LangDetectException
from deep_translator import GoogleTranslator
def clean_text(text: str) -> str:
"""
Mirror the training-time cleaning (lower, trim, strip special chars, collapse spaces).
"""
text = (text or "").lower().strip()
text = re.sub(r"[^a-z0-9\s.,!?'-]", "", text)
text = re.sub(r"\s+", " ", text)
return text
def to_english(text: str) -> Tuple[str, str]:
"""
Detect language and translate to English if needed.
Returns (processed_text, detected_language_code).
"""
if not text:
return "", "en"
try:
lang = detect(text)
except LangDetectException:
lang = "en"
if lang != "en":
try:
translator = GoogleTranslator(source=lang, target="en")
translated = translator.translate(text)
return translated, lang
except Exception:
# Fallback: return original text if translation fails
return text, lang
return text, lang

5 - Running the Application

The following script goes into main.py.

Single Entry Point: One command (python3 src/main.py) runs everything.

Comprehensive Logging: Prints detailed progress, summaries, and final deliverables.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import os
import sys
from datetime import datetime
# Ensure project root is on path for imports
sys.path.append(os.path.dirname(__file__))
from ETL.extract import extract_symptom_data
from ETL.transform import transform_symptom_data
from ETL.load import load_processed_data
from models.train import run_training_pipeline
from models.evaluate import run_evaluation_pipeline, evaluate_model
def run_etl_pipeline(
input_path: str = "data/raw/Symptom2Disease.csv",
output_csv: str = "data/processed/cleaned_symptoms.csv",
output_db: str = "data/processed/etl_logs.db"
) -> dict:
"""
Execute the ETL pipeline.
Args:
input_path: Path to raw input CSV file
output_csv: Path to save cleaned CSV file
output_db: Path to SQLite database for logs
Returns:
Dictionary containing pipeline execution summary
"""
print("
" + "="*70)
print("PHASE 1: ETL PIPELINE")
print(" Extract, Transform, Load")
print("="*70)
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
pipeline_start = datetime.now()
try:
# EXTRACT
print("STEP 1: EXTRACT")
df_raw, extract_summary = extract_symptom_data(input_path)
# TRANSFORM
print("\nSTEP 2: TRANSFORM")
df_transformed, vectorizer, transform_log = transform_symptom_data(df_raw)
# LOAD
print("\nSTEP 3: LOAD")
load_processed_data(df_transformed, transform_log, output_csv, output_db)
# Calculate duration
pipeline_end = datetime.now()
duration = (pipeline_end - pipeline_start).total_seconds()
# Generate summary
summary = {
'status': 'SUCCESS',
'duration_seconds': duration,
'input_file': input_path,
'output_csv': output_csv,
'output_db': output_db,
'records_processed': {
'initial': extract_summary['total_rows'],
'final': transform_log['final_rows'],
'removed': extract_summary['total_rows'] - transform_log['final_rows']
},
'transformations': {
'nulls_removed': transform_log['null_rows_removed'],
'duplicates_removed': transform_log['duplicate_rows_removed'],
'translations': transform_log['translations_performed']
},
'feature_engineering': {
'vocabulary_size': transform_log['vocabulary_size'],
'unique_specialties': transform_log['unique_labels']
}
}
# Print summary
print("\n" + "="*70)
print("[OK] ETL PIPELINE COMPLETE")
print("="*70)
print(f"Duration: {duration:.2f} seconds")
print(f"\nData Processing:")
print(f" Initial records: {summary['records_processed']['initial']:,}")
print(f" Final records: {summary['records_processed']['final']:,}")
print(f" Records removed: {summary['records_processed']['removed']:,}")
print(f"\nTransformations Applied:")
print(f" Null rows removed: {summary['transformations']['nulls_removed']:,}")
print(f" Duplicate rows removed: {summary['transformations']['duplicates_removed']:,}")
print(f" Translations performed: {summary['transformations']['translations']:,}")
print(f"\nFeature Engineering:")
print(f" TF-IDF vocabulary size: {summary['feature_engineering']['vocabulary_size']:,}")
print(f" Medical specialties: {summary['feature_engineering']['unique_specialties']:,}")
print(f"\nOutput Files:")
print(f" Cleaned CSV: {output_csv}")
print(f" ETL Logs DB: {output_db}")
print("="*70 + "\n")
return summary
except Exception as e:
print(f"\n[ERROR] ETL Pipeline Error: {str(e)}")
raise
def run_complete_pipeline(
# ETL parameters
raw_data_path: str = "data/raw/Symptom2Disease.csv",
cleaned_csv_path: str = "data/processed/cleaned_symptoms.csv",
etl_db_path: str = "data/processed/etl_logs.db",
# Training parameters
model_path: str = "artifacts/medical_model.joblib",
indices_path: str = "artifacts/reports/test_indices.json",
# Evaluation parameters
vis_dir: str = "artifacts/visualizations",
report_dir: str = "artifacts/reports",
target_accuracy: float = 0.85
) -> dict:
"""
Execute the complete pipeline: ETL -> Train -> Evaluate.
Args:
raw_data_path: Path to raw input CSV
cleaned_csv_path: Path for cleaned CSV output
etl_db_path: Path for ETL logs database
model_path: Path to save trained model
indices_path: Path to save test indices
vis_dir: Directory for visualizations
report_dir: Directory for reports
target_accuracy: Hypothesis testing target
Returns:
Dictionary with complete pipeline results
"""
overall_start = datetime.now()
print("\n" + "="*70)
print("MEDICAL TRIAGE SYSTEM - COMPLETE PIPELINE")
print(" WGU Capstone Project")
print("="*70)
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
try:
# =====================================================================
# PHASE 1: ETL
# =====================================================================
etl_summary = run_etl_pipeline(
input_path=raw_data_path,
output_csv=cleaned_csv_path,
output_db=etl_db_path
)
# =====================================================================
# PHASE 2: TRAINING
# =====================================================================
training_summary = run_training_pipeline(
input_csv=cleaned_csv_path,
output_model=model_path,
indices_path=indices_path
)
# =====================================================================
# PHASE 3: EVALUATION (using in-memory data from training)
# =====================================================================
print("\n" + "="*70)
print("PHASE 3: EVALUATION PIPELINE")
print(" Metrics & Visualizations")
print("="*70 + "\n")
# Use test data from training for evaluation
import joblib
pipeline = joblib.load(model_path)
eval_results = evaluate_model(
pipeline=pipeline,
X_train=training_summary['test_data']['X_train'],
X_test=training_summary['test_data']['X_test'],
y_train=training_summary['test_data']['y_train'],
y_test=training_summary['test_data']['y_test'],
target_accuracy=target_accuracy,
vis_dir=vis_dir,
report_dir=report_dir
)
# =====================================================================
# FINAL SUMMARY
# =====================================================================
overall_end = datetime.now()
total_duration = (overall_end - overall_start).total_seconds()
print("\n" + "="*70)
print("COMPLETE PIPELINE FINISHED")
print("="*70)
print(f"Total Duration: {total_duration:.2f} seconds")
print(f"\nMODEL PERFORMANCE:")
print(f" Accuracy: {eval_results['metrics']['accuracy']*100:.2f}%")
print(f" Precision: {eval_results['metrics']['precision']*100:.2f}%")
print(f" Recall: {eval_results['metrics']['recall']*100:.2f}%")
print(f" F1-Score: {eval_results['metrics']['f1_score']*100:.2f}%")
print(f"\nHYPOTHESIS TEST:")
print(f" Target: {eval_results['hypothesis_test']['target_accuracy']*100:.0f}%")
print(f" Achieved: {eval_results['hypothesis_test']['achieved_accuracy']*100:.2f}%")
print(f" Result: {eval_results['hypothesis_test']['conclusion']}")
print(f"\nALL DELIVERABLES:")
print(f" [OK] Cleaned Data: {cleaned_csv_path}")
print(f" [OK] ETL Logs: {etl_db_path}")
print(f" [OK] Trained Model: {model_path}")
print(f" [OK] Test Indices: {indices_path}")
print(f" [OK] Classification Report: {report_dir}/classification_report.csv")
print(f" [OK] Visualizations: {vis_dir}/")
print(f"\nNEXT STEP:")
print(f" Run: streamlit run src/ui/app.py")
print("="*70)
print(f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
return {
'status': 'SUCCESS',
'duration_seconds': total_duration,
'etl': etl_summary,
'training': training_summary,
'evaluation': eval_results
}
except Exception as e:
print(f"\n[ERROR] Pipeline Error: {str(e)}")
raise
if __name__ == "__main__":
try:
results = run_complete_pipeline()
sys.exit(0)
except Exception as e:
print(f"\n[ERROR] Pipeline failed: {e}")
sys.exit(1)

Final Project Structure:

A screenshot of what the project folder structure looks like during the project's completion

If the app is working without errors you should see something like this in your terminal when you run main.py:

A screenshot of what the project folder structure looks like during the project's completion

Now run streamlit run ui/app.py in terminal to view the visualizations and a demo of our app.

A screenshot of what the project folder structure looks like during the project's completion