From 9601c5e44d129629f37b4085a1eaca19a3b831d8 Mon Sep 17 00:00:00 2001 From: bernard-ng Date: Wed, 13 Aug 2025 23:09:05 +0200 Subject: [PATCH] feat: enhance logging and memory management across modules --- app.py | 30 +- config/pipeline.yaml | 1 + core/config/annotation_config.py | 1 + core/config/data_config.py | 4 +- core/utils/data_loader.py | 76 ++++- core/utils/log_reader.py | 2 - core/utils/region_mapper.py | 10 +- engineer_ner_dataset.py | 72 ----- interface/dashboard.py | 3 +- interface/data_overview.py | 3 +- interface/data_processing.py | 49 +-- interface/experiments.py | 64 +++- interface/log_reader.py | 35 +-- interface/predictions.py | 31 +- interface/results_analysis.py | 8 +- main.py | 16 +- monitor.py | 27 +- ner.py | 88 ++++-- processing/batch/batch_processor.py | 85 ++++- processing/batch/memory_monitor.py | 25 ++ processing/monitoring/data_analyzer.py | 52 ---- processing/monitoring/pipeline_monitor.py | 8 +- processing/ner/formats/__init__.py | 35 ++- processing/ner/formats/connectors_format.py | 26 +- .../ner/formats/extended_surname_format.py | 22 +- processing/ner/formats/native_only_format.py | 22 +- processing/ner/formats/original_format.py | 22 +- .../ner/formats/position_flipped_format.py | 22 +- .../ner/formats/reduced_native_format.py | 24 +- processing/ner/ner_data_builder.py | 290 ++++++++---------- processing/ner/ner_engineering.py | 119 +++---- processing/ner/ner_name_model.py | 114 ++++--- processing/ner/ner_name_tagger.py | 38 ++- processing/steps/__init__.py | 13 +- processing/steps/data_splitting_step.py | 19 +- processing/steps/feature_extraction_step.py | 179 ++++++++--- processing/steps/llm_annotation_step.py | 9 +- processing/steps/ner_annotation_step.py | 18 +- research/experiment/experiment_runner.py | 8 +- research/model_trainer.py | 19 +- research/models/lightgbm_model.py | 16 +- research/models/logistic_regression_model.py | 4 +- research/models/random_forest_model.py | 2 +- research/models/svm_model.py | 2 +- research/models/xgboost_model.py | 18 +- research/neural_network_model.py | 10 +- research/traditional_model.py | 4 +- train.py | 32 +- 48 files changed, 1004 insertions(+), 773 deletions(-) delete mode 100644 engineer_ner_dataset.py create mode 100644 processing/batch/memory_monitor.py delete mode 100644 processing/monitoring/data_analyzer.py diff --git a/app.py b/app.py index 67ed4af..8dc8a87 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,9 @@ #!.venv/bin/python3 +import argparse + import streamlit as st -from core.config import get_config +from core.config import get_config, setup_config, PipelineConfig from core.utils.data_loader import DataLoader from interface.configuration import Configuration from interface.dashboard import Dashboard @@ -23,17 +25,11 @@ st.set_page_config( ) -@st.cache_data -def load_config(): - """Load application configuration with unified setup""" - return get_config() - - class StreamlitApp: """Main Streamlit application class""" - def __init__(self): - self.config = load_config() + def __init__(self, config: PipelineConfig): + self.config = config self.data_loader = DataLoader(self.config) self.experiment_tracker = ExperimentTracker(self.config) self.experiment_runner = ExperimentRunner(self.config) @@ -44,7 +40,9 @@ class StreamlitApp: self.data_overview = DataOverview(self.config) self.data_processing = DataProcessing(self.config, self.pipeline_monitor) self.experiments = Experiments(self.config, self.experiment_tracker, self.experiment_runner) - self.results_analysis = ResultsAnalysis(self.config, self.experiment_tracker, self.experiment_runner) + self.results_analysis = ResultsAnalysis( + self.config, self.experiment_tracker, self.experiment_runner + ) self.predictions = Predictions(self.config, self.experiment_tracker, self.experiment_runner) self.configuration = Configuration(self.config) @@ -86,8 +84,16 @@ class StreamlitApp: def main(): - """Main application entry point""" - app = StreamlitApp() + parser = argparse.ArgumentParser( + description="DRC NERS Platform", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--config", type=str, help="Path to configuration file") + parser.add_argument("--env", type=str, default="development", help="Environment name") + args = parser.parse_args() + + config = setup_config(args.config, env=args.env) + app = StreamlitApp(config) app.run() diff --git a/config/pipeline.yaml b/config/pipeline.yaml index 79e2fa8..babc9be 100644 --- a/config/pipeline.yaml +++ b/config/pipeline.yaml @@ -64,6 +64,7 @@ data: ner_spacy: "names_ner.spacy" # Output file for NER annotated data using spaCy format split_evaluation: false # Should the dataset be split into training and evaluation sets ? split_by_gender: true # Should the dataset be split by gender ? + split_by_province: true # Should the dataset be split by province ? split_ner_data: true # Should the NER data be extracted and saved? evaluation_fraction: 0.2 # Fraction of data to use for evaluation random_seed: 42 # Random seed for reproducibility diff --git a/core/config/annotation_config.py b/core/config/annotation_config.py index 59c1c21..4100562 100644 --- a/core/config/annotation_config.py +++ b/core/config/annotation_config.py @@ -1,5 +1,6 @@ from pydantic import BaseModel + class NERConfig(BaseModel): """NER annotation configuration""" diff --git a/core/config/data_config.py b/core/config/data_config.py index cb5c150..bedfdda 100644 --- a/core/config/data_config.py +++ b/core/config/data_config.py @@ -12,13 +12,15 @@ class DataConfig(BaseModel): default_factory=lambda: { "featured": "names_featured.csv", "evaluation": "names_evaluation.csv", + "engineered": "names_engineered.csv", "males": "names_males.csv", "females": "names_females.csv", "ner_data": "names_ner.json", - "ner_spacy": "names_ner.spacy" + "ner_spacy": "names_ner.spacy", } ) split_evaluation: bool = False + split_by_province: bool = True split_by_gender: bool = True split_ner_data: bool = True evaluation_fraction: float = 0.2 diff --git a/core/utils/data_loader.py b/core/utils/data_loader.py index da3eb59..390ba60 100644 --- a/core/utils/data_loader.py +++ b/core/utils/data_loader.py @@ -1,17 +1,41 @@ +import gc import logging from pathlib import Path -from typing import Optional, Union, Iterator +from typing import Optional, Union, Iterator, Dict import pandas as pd from core.config.pipeline_config import PipelineConfig +OPTIMIZED_DTYPES = { + # Numeric columns with appropriate bit-width + "year": "Int16", # Years fit in 16-bit integer + "words": "Int8", # Word counts typically < 128 + "length": "Int16", # Name lengths fit in 16-bit + "annotated": "Int8", # Binary flag (0/1) + "ner_tagged": "Int8", # Binary flag (0/1) + # Categorical columns (memory efficient for repeated values) + "sex": "category", + "province": "category", + "region": "category", + "identified_category": "category", + "transformation_type": "category", + # String columns with proper string dtype + "name": "string", + "probable_native": "string", + "probable_surname": "string", + "identified_name": "string", + "identified_surname": "string", + "ner_entities": "string", +} + class DataLoader: """Reusable data loading utilities""" - def __init__(self, config: PipelineConfig): + def __init__(self, config: PipelineConfig, custom_dtypes: Optional[Dict] = None): self.config = config + self.dtypes = {**OPTIMIZED_DTYPES, **(custom_dtypes or {})} def load_csv_chunked( self, filepath: Union[str, Path], chunk_size: Optional[int] = None @@ -19,19 +43,23 @@ class DataLoader: """Load CSV file in chunks for memory efficiency""" chunk_size = chunk_size or self.config.processing.chunk_size encodings = self.config.processing.encoding_options - filepath = Path(filepath) for encoding in encodings: try: - logging.info(f"Attempting to read {filepath} with encoding: {encoding}") + logging.info(f"Reading {filepath} with encoding: {encoding}") + # Read with optimal dtypes chunk_iter = pd.read_csv( - filepath, encoding=encoding, chunksize=chunk_size, on_bad_lines="skip" + filepath, + encoding=encoding, + chunksize=chunk_size, + on_bad_lines="skip", + dtype=self.dtypes, ) for i, chunk in enumerate(chunk_iter): - logging.debug(f"Processing chunk {i+1}") + logging.debug(f"Processing optimized chunk {i + 1}") yield chunk logging.info(f"Successfully read {filepath} with encoding: {encoding}") @@ -44,12 +72,20 @@ class DataLoader: raise ValueError(f"Unable to decode {filepath} with any encoding: {encodings}") def load_csv_complete(self, filepath: Union[str, Path]) -> pd.DataFrame: - """Load complete CSV file into memory with size limiting and balancing""" - chunks = list(self.load_csv_chunked(filepath)) + """Load complete CSV with memory optimization""" + chunks = [] + for chunk in self.load_csv_chunked(filepath): + chunks.append(chunk) + if not chunks: return pd.DataFrame() - df = pd.concat(chunks, ignore_index=True) + logging.info(f"Concatenating {len(chunks)} optimized chunks") + df = pd.concat(chunks, ignore_index=True, copy=False) + + # Cleanup chunks from memory + del chunks + gc.collect() # Apply dataset size limiting if configured if self.config.data.max_dataset_size is not None: @@ -87,27 +123,35 @@ class DataLoader: balanced_samples = [] for i, sex in enumerate(sex_values): - sex_df = df[df["sex"] == sex] + # Use boolean indexing instead of creating temporary DataFrames + sex_mask = df["sex"] == sex + sex_indices = df[sex_mask].index # Distribute remaining samples to first categories current_samples = samples_per_sex + (1 if i < remaining_samples else 0) - current_samples = min(current_samples, len(sex_df)) + current_samples = min(current_samples, len(sex_indices)) if current_samples > 0: - sample = sex_df.sample(n=current_samples, random_state=self.config.data.random_seed + i) - balanced_samples.append(sample) + # Sample indices instead of DataFrame + sampled_indices = pd.Series(sex_indices).sample( + n=current_samples, random_state=self.config.data.random_seed + i + ) + balanced_samples.extend(sampled_indices.tolist()) logging.info(f"Sampled {current_samples} records for sex '{sex}'") if not balanced_samples: logging.warning("No balanced samples could be created, using random sampling") return df.sample(n=max_size, random_state=self.config.data.random_seed) - result = pd.concat(balanced_samples, ignore_index=True) + # Create result using iloc with indices (no copying until final step) + result = df.iloc[balanced_samples].copy() # Shuffle the final result - result = result.sample(frac=1, random_state=self.config.data.random_seed).reset_index(drop=True) + result = result.sample(frac=1, random_state=self.config.data.random_seed).reset_index( + drop=True + ) - logging.info(f"Created balanced dataset with {len(result)} records from {len(df)} total records") + logging.info(f"Created balanced dataset with {len(result)} records from {len(df)} total") return result @classmethod diff --git a/core/utils/log_reader.py b/core/utils/log_reader.py index b28b04f..8b13789 100644 --- a/core/utils/log_reader.py +++ b/core/utils/log_reader.py @@ -1,3 +1 @@ - - diff --git a/core/utils/region_mapper.py b/core/utils/region_mapper.py index 3dc5eb2..1af77a1 100644 --- a/core/utils/region_mapper.py +++ b/core/utils/region_mapper.py @@ -9,14 +9,9 @@ class RegionMapper: def __init__(self, mapping: Optional[Dict] = None): self.mapping = mapping or REGION_MAPPING - def map_region_to_province(self, region: str) -> str: - """Map a region to its province""" - region_lower = str(region).lower().strip() - return self.mapping.get(region_lower, ("AUTRES", "AUTRES"))[1].lower() - - def map_regions_vectorized(self, regions: pd.Series) -> pd.Series: + def map(self, series: pd.Series) -> pd.Series: """Vectorized region to province mapping""" - return regions.str.lower().map( + return series.str.lower().map( lambda r: self.mapping.get(r, ("AUTRES", "AUTRES"))[1].lower() ) @@ -34,6 +29,7 @@ class RegionMapper: "sud-kivu", "kasai-occidental", "kasai-oriental", + "autres", ] diff --git a/engineer_ner_dataset.py b/engineer_ner_dataset.py deleted file mode 100644 index ef63860..0000000 --- a/engineer_ner_dataset.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -""" -NER Dataset Feature Engineering Script -Processes the names_featured.csv dataset to create position-independent variations -""" - -import argparse -import os - -from processing.ner.ner_engineering import NEREngineering - - -def main(): - parser = argparse.ArgumentParser(description='Engineer NER dataset for position-independent learning') - parser.add_argument('--input', default='data/dataset/names_featured.csv', help='Input CSV file path') - parser.add_argument('--output', default='data/dataset/names_featured_engineered.csv', help='Output CSV file path') - parser.add_argument('--seed', type=int, default=42, help='Random seed for reproducibility') - - args = parser.parse_args() - - print("=== NER Dataset Feature Engineering ===") - print(f"Input file: {args.input}") - print(f"Output file: {args.output}") - print(f"Random seed: {args.seed}") - - # Check if input file exists - if not os.path.exists(args.input): - print(f"Error: Input file {args.input} not found!") - return - - # Initialize engineering class - engineering = NEREngineering() - - try: - # Load data with progress indication - print("\n1. Loading NER-tagged data...") - data = engineering.load_ner_data(args.input) - print(f" Dataset size: {len(data):,} rows") - - # Show sample of original data - print("\n2. Sample original data:") - for i, row in data.head(3).iterrows(): - print(f" {row['name']} -> Native: '{row['probable_native']}', Surname: '{row['probable_surname']}'") - - # Apply transformations - print("\n3. Applying feature engineering transformations...") - engineered_data = engineering.engineer_dataset(data, random_seed=args.seed) - - # Save results - print(f"\n4. Saving engineered dataset to {args.output}...") - engineering.save_engineered_dataset(engineered_data, args.output) - - # Show statistics - print(f"\n=== RESULTS SUMMARY ===") - print(f"Original dataset: {len(data):,} rows") - print(f"Engineered dataset: {len(engineered_data):,} rows") - print(f"Transformation distribution:") - counts = engineered_data['transformation_type'].value_counts().sort_index() - for trans_type, count in counts.items(): - percentage = (count / len(engineered_data)) * 100 - print(f" {trans_type}: {count:,} rows ({percentage:.1f}%)") - - print(f"\nDataset successfully engineered and saved!") - - except Exception as e: - print(f"Error during processing: {str(e)}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - main() diff --git a/interface/dashboard.py b/interface/dashboard.py index a2fb18e..aa02fa9 100644 --- a/interface/dashboard.py +++ b/interface/dashboard.py @@ -2,11 +2,12 @@ import pandas as pd import streamlit as st from core.utils import get_data_file_path +from core.utils.data_loader import OPTIMIZED_DTYPES def load_dataset(file_path: str) -> pd.DataFrame: try: - return pd.read_csv(file_path) + return pd.read_csv(file_path, dtype=OPTIMIZED_DTYPES) except Exception as e: st.error(f"Error loading dataset: {e}") return pd.DataFrame() diff --git a/interface/data_overview.py b/interface/data_overview.py index 9e39089..912afc3 100644 --- a/interface/data_overview.py +++ b/interface/data_overview.py @@ -5,11 +5,12 @@ import plotly.express as px import streamlit as st from core.utils import get_data_file_path +from core.utils.data_loader import OPTIMIZED_DTYPES def load_dataset(file_path: str) -> pd.DataFrame: try: - return pd.read_csv(file_path) + return pd.read_csv(file_path, dtype=OPTIMIZED_DTYPES) except Exception as e: st.error(f"Error loading dataset: {e}") return pd.DataFrame() diff --git a/interface/data_processing.py b/interface/data_processing.py index 8b586da..2ff0482 100644 --- a/interface/data_processing.py +++ b/interface/data_processing.py @@ -2,12 +2,13 @@ import pandas as pd import plotly.express as px import streamlit as st +from core.utils.data_loader import OPTIMIZED_DTYPES from interface.log_reader import LogReader def load_dataset(file_path: str) -> pd.DataFrame: try: - return pd.read_csv(file_path) + return pd.read_csv(file_path, dtype=OPTIMIZED_DTYPES) except Exception as e: st.error(f"Error loading dataset: {e}") return pd.DataFrame() @@ -56,16 +57,12 @@ class DataProcessing: log_level_filter = st.selectbox( "Filter by Level", ["All", "INFO", "WARNING", "ERROR", "DEBUG", "CRITICAL"], - key="log_level_filter" + key="log_level_filter", ) with col2: num_entries = st.number_input( - "Number of entries", - min_value=5, - max_value=50, - value=10, - key="num_log_entries" + "Number of entries", min_value=5, max_value=50, value=10, key="num_log_entries" ) # Get log entries based on filter @@ -77,13 +74,21 @@ class DataProcessing: if log_entries: for entry in log_entries: if entry.level == "ERROR": - st.error(f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}") + st.error( + f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}" + ) elif entry.level == "WARNING": - st.warning(f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}") + st.warning( + f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}" + ) elif entry.level == "INFO": - st.info(f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}") + st.info( + f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}" + ) else: - st.text(f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}") + st.text( + f"[{entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {entry.level}: {entry.message}" + ) # Show log statistics st.subheader("Log Statistics") @@ -93,16 +98,16 @@ class DataProcessing: col1, col2, col3, col4 = st.columns(4) with col1: - st.metric("Total Lines", log_stats.get('total_lines', 0)) + st.metric("Total Lines", log_stats.get("total_lines", 0)) with col2: - st.metric("INFO", log_stats.get('INFO', 0)) + st.metric("INFO", log_stats.get("INFO", 0)) with col3: - st.metric("WARNING", log_stats.get('WARNING', 0)) + st.metric("WARNING", log_stats.get("WARNING", 0)) with col4: - st.metric("ERROR", log_stats.get('ERROR', 0)) + st.metric("ERROR", log_stats.get("ERROR", 0)) # Log level distribution chart - levels = ['INFO', 'WARNING', 'ERROR', 'DEBUG', 'CRITICAL'] + levels = ["INFO", "WARNING", "ERROR", "DEBUG", "CRITICAL"] counts = [log_stats.get(level, 0) for level in levels] if sum(counts) > 0: @@ -112,12 +117,12 @@ class DataProcessing: title="Log Entries by Level", color=levels, color_discrete_map={ - 'INFO': 'blue', - 'WARNING': 'orange', - 'ERROR': 'red', - 'DEBUG': 'gray', - 'CRITICAL': 'darkred' - } + "INFO": "blue", + "WARNING": "orange", + "ERROR": "red", + "DEBUG": "gray", + "CRITICAL": "darkred", + }, ) st.plotly_chart(fig, use_container_width=True) else: diff --git a/interface/experiments.py b/interface/experiments.py index 25fbd96..aa519ec 100644 --- a/interface/experiments.py +++ b/interface/experiments.py @@ -14,7 +14,9 @@ from research.model_registry import list_available_models class Experiments: """Handles experiment management interface""" - def __init__(self, config, experiment_tracker: ExperimentTracker, experiment_runner: ExperimentRunner): + def __init__( + self, config, experiment_tracker: ExperimentTracker, experiment_runner: ExperimentRunner + ): self.config = config self.experiment_tracker = experiment_tracker self.experiment_runner = experiment_runner @@ -41,13 +43,19 @@ class Experiments: col1, col2 = st.columns(2) with col1: - exp_name = st.text_input("Experiment Name", placeholder="e.g., native_name_gender_prediction") - description = st.text_area("Description", placeholder="Brief description of the experiment") + exp_name = st.text_input( + "Experiment Name", placeholder="e.g., native_name_gender_prediction" + ) + description = st.text_area( + "Description", placeholder="Brief description of the experiment" + ) model_type = st.selectbox("Model Type", list_available_models()) # Feature selection feature_options = [f.value for f in FeatureType] - selected_features = st.multiselect("Features to Use", feature_options, default=["full_name"]) + selected_features = st.multiselect( + "Features to Use", feature_options, default=["full_name"] + ) with col2: # Model parameters @@ -74,7 +82,9 @@ class Experiments: test_size = st.slider("Test Set Size", 0.1, 0.5, 0.2) cv_folds = st.number_input("Cross-Validation Folds", 3, 10, 5) - tags = st.text_input("Tags (comma-separated)", placeholder="e.g., baseline, feature_study") + tags = st.text_input( + "Tags (comma-separated)", placeholder="e.g., baseline, feature_study" + ) # Advanced options with st.expander("Advanced Options"): @@ -92,14 +102,33 @@ class Experiments: if submitted: self._handle_experiment_submission( - exp_name, description, model_type, selected_features, model_params, - test_size, cv_folds, tags, filter_province, min_words, max_words + exp_name, + description, + model_type, + selected_features, + model_params, + test_size, + cv_folds, + tags, + filter_province, + min_words, + max_words, ) - def _handle_experiment_submission(self, exp_name: str, description: str, model_type: str, - selected_features: List[str], model_params: Dict[str, Any], - test_size: float, cv_folds: int, tags: str, - filter_province: str, min_words: int, max_words: int): + def _handle_experiment_submission( + self, + exp_name: str, + description: str, + model_type: str, + selected_features: List[str], + model_params: Dict[str, Any], + test_size: float, + cv_folds: int, + tags: str, + filter_province: str, + min_words: int, + max_words: int, + ): """Handle experiment form submission""" if not exp_name: st.error("Please provide an experiment name") @@ -183,7 +212,7 @@ class Experiments: # Display experiments for i, exp in enumerate(experiments): with st.expander( - f"{exp.config.name} - {exp.status.value} - {exp.start_time.strftime('%Y-%m-%d %H:%M')}" + f"{exp.config.name} - {exp.status.value} - {exp.start_time.strftime('%Y-%m-%d %H:%M')}" ): self._display_experiment_details(exp, i) @@ -268,8 +297,15 @@ class Experiments: base_name, model_types, ngram_ranges, feature_combinations, test_sizes, tags ) - def run_batch_experiments(self, base_name: str, model_types: List[str], ngram_ranges: str, - feature_combinations: List[str], test_sizes: str, tags: str): + def run_batch_experiments( + self, + base_name: str, + model_types: List[str], + ngram_ranges: str, + feature_combinations: List[str], + test_sizes: str, + tags: str, + ): """Run batch experiments with parameter combinations""" with st.spinner("Running batch experiments..."): try: diff --git a/interface/log_reader.py b/interface/log_reader.py index b11bbb2..3305316 100644 --- a/interface/log_reader.py +++ b/interface/log_reader.py @@ -8,6 +8,7 @@ from typing import List, Dict, Optional @dataclass class LogEntry: """Represents a single log entry.""" + timestamp: datetime logger: str level: str @@ -23,7 +24,7 @@ class LogReader: self.log_file_path = Path(log_file_path) # Pattern to match Python logging format: timestamp - logger - level - message self.log_pattern = re.compile( - r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - (.+?) - (\w+) - (.+)' + r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - (.+?) - (\w+) - (.+)" ) def read_last_entries(self, count: int = 10) -> List[LogEntry]: @@ -32,12 +33,12 @@ class LogReader: return [] try: - with open(self.log_file_path, 'r', encoding='utf-8') as file: + with open(self.log_file_path, "r", encoding="utf-8") as file: lines = file.readlines() # Parse log entries from the end entries = [] - for line in reversed(lines[-count * 2:]): # Read more lines in case some don't match + for line in reversed(lines[-count * 2 :]): # Read more lines in case some don't match entry = self._parse_log_line(line.strip()) if entry: entries.append(entry) @@ -57,7 +58,7 @@ class LogReader: return [] try: - with open(self.log_file_path, 'r', encoding='utf-8') as file: + with open(self.log_file_path, "r", encoding="utf-8") as file: lines = file.readlines() entries = [] @@ -80,7 +81,7 @@ class LogReader: return [] try: - with open(self.log_file_path, 'r', encoding='utf-8') as file: + with open(self.log_file_path, "r", encoding="utf-8") as file: lines = file.readlines() entries = [] @@ -107,16 +108,16 @@ class LogReader: return {} try: - with open(self.log_file_path, 'r', encoding='utf-8') as file: + with open(self.log_file_path, "r", encoding="utf-8") as file: lines = file.readlines() stats = { - 'total_lines': len(lines), - 'INFO': 0, - 'WARNING': 0, - 'ERROR': 0, - 'DEBUG': 0, - 'CRITICAL': 0 + "total_lines": len(lines), + "INFO": 0, + "WARNING": 0, + "ERROR": 0, + "DEBUG": 0, + "CRITICAL": 0, } for line in lines: @@ -143,14 +144,10 @@ class LogReader: try: timestamp_str, logger, level, message = match.groups() - timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f') + timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f") return LogEntry( - timestamp=timestamp, - logger=logger, - level=level, - message=message, - raw_line=line + timestamp=timestamp, logger=logger, level=level, message=message, raw_line=line ) except ValueError: return None @@ -168,7 +165,7 @@ class MultiLogReader: if not self.log_directory.exists(): return [] - return list(self.log_directory.glob('*.log')) + return list(self.log_directory.glob("*.log")) def read_from_all_files(self, count: int = 10) -> List[LogEntry]: """Read entries from all log files and merge them chronologically.""" diff --git a/interface/predictions.py b/interface/predictions.py index 76b8707..c8154cb 100644 --- a/interface/predictions.py +++ b/interface/predictions.py @@ -9,6 +9,7 @@ import plotly.express as px import streamlit as st from core.utils import get_data_file_path +from core.utils.data_loader import OPTIMIZED_DTYPES from research.experiment.experiment_runner import ExperimentRunner from research.experiment.experiment_tracker import ExperimentTracker @@ -16,7 +17,9 @@ from research.experiment.experiment_tracker import ExperimentTracker class Predictions: """Handles prediction interface""" - def __init__(self, config, experiment_tracker: ExperimentTracker, experiment_runner: ExperimentRunner): + def __init__( + self, config, experiment_tracker: ExperimentTracker, experiment_runner: ExperimentRunner + ): self.config = config self.experiment_tracker = experiment_tracker self.experiment_runner = experiment_runner @@ -86,7 +89,9 @@ class Predictions: confidence = self._get_prediction_confidence(model, input_df) # Display results - self._display_single_prediction_results(prediction, confidence, experiment, name_input) + self._display_single_prediction_results( + prediction, confidence, experiment, name_input + ) except Exception as e: st.error(f"Error making prediction: {e}") @@ -114,8 +119,9 @@ class Predictions: except: return None - def _display_single_prediction_results(self, prediction: str, confidence: Optional[float], - experiment, name_input: str): + def _display_single_prediction_results( + self, prediction: str, confidence: Optional[float], experiment, name_input: str + ): """Display single prediction results""" col1, col2 = st.columns(2) @@ -129,9 +135,7 @@ class Predictions: # Additional info st.info(f"Model used: {experiment.config.name}") - st.info( - f"Features used: {', '.join([f.value for f in experiment.config.features])}" - ) + st.info(f"Features used: {', '.join([f.value for f in experiment.config.features])}") def show_batch_prediction(self, experiment): """Show batch prediction interface""" @@ -141,7 +145,7 @@ class Predictions: if uploaded_file is not None: try: - df = pd.read_csv(uploaded_file) + df = pd.read_csv(uploaded_file, dtype=OPTIMIZED_DTYPES) st.write("**Uploaded Data Preview:**") st.dataframe(df.head(), use_container_width=True) @@ -296,13 +300,14 @@ class Predictions: def _load_dataset(self, file_path: str) -> pd.DataFrame: """Load dataset with error handling""" try: - return pd.read_csv(file_path) + return pd.read_csv(file_path, dtype=OPTIMIZED_DTYPES) except Exception as e: st.error(f"Error loading dataset: {e}") return pd.DataFrame() - def _run_dataset_prediction(self, df: pd.DataFrame, experiment, sample_size: int, - compare_with_actual: bool): + def _run_dataset_prediction( + self, df: pd.DataFrame, experiment, sample_size: int, compare_with_actual: bool + ): """Run dataset prediction and display results""" with st.spinner("Running predictions..."): # Sample data if requested @@ -353,7 +358,9 @@ class Predictions: with col2: st.write("**Sample Incorrect Predictions**") - incorrect_sample = df_sample[~correct_mask][["name", "sex", "predicted_gender"]].head(10) + incorrect_sample = df_sample[~correct_mask][["name", "sex", "predicted_gender"]].head( + 10 + ) st.dataframe(incorrect_sample, use_container_width=True) def _display_dataset_predictions(self, df_sample: pd.DataFrame): diff --git a/interface/results_analysis.py b/interface/results_analysis.py index 4f39823..22123f9 100644 --- a/interface/results_analysis.py +++ b/interface/results_analysis.py @@ -13,7 +13,9 @@ from research.experiment.experiment_tracker import ExperimentTracker class ResultsAnalysis: """Handles experiment results and analysis interface""" - def __init__(self, config, experiment_tracker: ExperimentTracker, experiment_runner: ExperimentRunner): + def __init__( + self, config, experiment_tracker: ExperimentTracker, experiment_runner: ExperimentRunner + ): self.config = config self.experiment_tracker = experiment_tracker self.experiment_runner = experiment_runner @@ -21,7 +23,9 @@ class ResultsAnalysis: def index(self): """Main results analysis page""" st.header("Results & Analysis") - tab1, tab2, tab3 = st.tabs(["Experiment Comparison", "Performance Analysis", "Model Analysis"]) + tab1, tab2, tab3 = st.tabs( + ["Experiment Comparison", "Performance Analysis", "Model Analysis"] + ) with tab1: self.show_experiment_comparison() diff --git a/main.py b/main.py index 4e13e0d..bbc3a18 100755 --- a/main.py +++ b/main.py @@ -8,13 +8,11 @@ from core.config import setup_config from core.utils import get_data_file_path from core.utils.data_loader import DataLoader from processing.batch.batch_config import BatchConfig -from processing.ner.ner_data_builder import NERDataBuilder from processing.pipeline import Pipeline from processing.steps.data_cleaning_step import DataCleaningStep from processing.steps.data_splitting_step import DataSplittingStep from processing.steps.feature_extraction_step import FeatureExtractionStep from processing.steps.llm_annotation_step import LLMAnnotationStep -from processing.steps.ner_annotation_step import NERAnnotationStep def create_pipeline(config) -> Pipeline: @@ -31,9 +29,8 @@ def create_pipeline(config) -> Pipeline: steps = [ DataCleaningStep(config), FeatureExtractionStep(config), - NERAnnotationStep(config), + # NERAnnotationStep(config), LLMAnnotationStep(config), - DataSplittingStep(config), ] for stage in config.stages: @@ -56,6 +53,7 @@ def run_pipeline(config) -> int: return 1 data_loader = DataLoader(config) + data_splitter = DataSplittingStep(config) logging.info(f"Loading data from {input_file_path}") df = data_loader.load_csv_complete(input_file_path) logging.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") @@ -64,13 +62,7 @@ def run_pipeline(config) -> int: pipeline = create_pipeline(config) logging.info("Starting pipeline execution") - result_df = pipeline.run(df) - - # Save results using the splitting step - splitting_step = pipeline.steps[-1] - if isinstance(splitting_step, DataSplittingStep): - splitting_step.save_splits(result_df) - NERDataBuilder(config).build(result_df) + data_splitter.split(pipeline.run(df)) # Show completion statistics progress = pipeline.get_progress() @@ -94,7 +86,7 @@ def run_pipeline(config) -> int: def main(): """Main entry point with unified configuration loading""" parser = argparse.ArgumentParser( - description="DRC Names Processing Pipeline", + description="DRC NERS Processing Pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--config", type=str, help="Path to configuration file") diff --git a/monitor.py b/monitor.py index d76a9f3..1d435b5 100755 --- a/monitor.py +++ b/monitor.py @@ -9,39 +9,40 @@ from processing.monitoring.pipeline_monitor import PipelineMonitor def main(): - choices = ["data_cleaning", "feature_extraction", "ner_annotation", "llm_annotation", "data_splitting"] + choices = [ + "data_cleaning", + "feature_extraction", + "ner_annotation", + "llm_annotation", + "data_splitting", + ] - parser = argparse.ArgumentParser(description="Monitor and manage the DRC names processing pipeline") + parser = argparse.ArgumentParser(description="DRC NERS Processing Monitoring") parser.add_argument("--config", type=Path, help="Path to configuration file") parser.add_argument("--env", type=str, default="development", help="Environment") subparsers = parser.add_subparsers(dest="command", help="Available commands") - # Status command - subparsers.add_parser("status", help="Show pipeline status") - # Clean command clean_parser = subparsers.add_parser("clean", help="Clean checkpoint files") - clean_parser.add_argument("--step", type=str, choices=choices, help="Specific step (default: all)") - clean_parser.add_argument("--keep-last", type=int, default=1, help="Checkpoints to keep (default: 1)") + clean_parser.add_argument("--step", type=str, choices=choices, help="default: all") + clean_parser.add_argument("--keep-last", type=int, default=1, help="(default: 1)") clean_parser.add_argument("--force", action="store_true", help="Clean without confirmation") # Reset command reset_parser = subparsers.add_parser("reset", help="Reset pipeline step") - reset_parser.add_argument("--step", type=str, choices=choices, help="Specific step (default: all)") + reset_parser.add_argument("--step", type=str, choices=choices, help="(default: all)") reset_parser.add_argument("--all", action="store_true", help="Reset all steps") reset_parser.add_argument("--force", action="store_true", help="Reset without confirmation") args = parser.parse_args() - if not args.command: - parser.print_help() - return 1 - try: setup_config(config_path=args.config, env=args.env) monitor = PipelineMonitor() - if args.command == "status": + if not args.command: + parser.print_help() monitor.print_status(detailed=True) + return 1 elif args.command == "clean": checkpoint_info = monitor.count_checkpoint_files() diff --git a/ner.py b/ner.py index d2a8834..09dfcdd 100755 --- a/ner.py +++ b/ner.py @@ -2,51 +2,89 @@ import argparse import logging import sys +import os +import traceback from pathlib import Path -from core.config import setup_config +from core.config import setup_config, PipelineConfig from processing.ner.ner_data_builder import NERDataBuilder +from processing.ner.ner_engineering import NEREngineering from processing.ner.ner_name_model import NERNameModel -def train(config_path=None, env="development"): +def feature(config: PipelineConfig): + """Apply feature engineering to create position-independent NER dataset.""" + NEREngineering(config).compute() + + +def build(config: PipelineConfig): + """Build NER dataset using NERDataBuilder.""" + NERDataBuilder(config).build() + + +def train(config: PipelineConfig): """Train the NER model.""" - try: - config = setup_config(config_path=config_path, env=env) - trainer = NERNameModel(config) - builder = NERDataBuilder(config) + trainer = NERNameModel(config) - data_path = Path(config.paths.data_dir) / config.data.output_files["ner_data"] - if not data_path.exists(): - builder.build() + data_path = Path(config.paths.data_dir) / config.data.output_files["ner_data"] + if not data_path.exists(): + logging.info("NER data not found. Building dataset first...") + build(config) - trainer.create_blank_model("fr") - data = trainer.load_data(str(data_path)) + trainer.create_blank_model("fr") + data = trainer.load_data(str(data_path)) - split_idx = int(len(data) * 0.8) - train_data, eval_data = data[:split_idx], data[split_idx:] + split_idx = int(len(data) * 0.9) + train_data, eval_data = data[:split_idx], data[split_idx:] - logging.info(f"Training with {len(train_data)} examples, evaluating on {len(eval_data)}") - trainer.train(train_data, epochs=1, batch_size=config.processing.batch_size, dropout_rate=0.3) - trainer.evaluate(eval_data) + logging.info(f"Training with {len(train_data)} examples, evaluating on {len(eval_data)}") + trainer.train( + data=train_data, epochs=1, batch_size=config.processing.batch_size, dropout_rate=0.3 + ) + trainer.evaluate(eval_data) - model_path = trainer.save() - logging.info(f"Model saved to: {model_path}") - return 0 + model_path = trainer.save() + logging.info(f"Model saved to: {model_path}") - except Exception as e: - logging.error(f"NER Training failed: {e}", exc_info=True) - return 1 + +def run_pipeline(config: PipelineConfig, reset: bool = False): + # Step 1: Feature engineering + if not reset and os.path.exists(config.paths.data_dir / config.data.output_files["engineered"]): + logging.info("Step 1: Feature engineering already done.") + else: + logging.info("Step 1: Running feature engineering") + feature(config) + + # Step 2: Build dataset + if not reset and os.path.exists(config.paths.data_dir / config.data.output_files["ner_data"]): + logging.info("Step 2: NER dataset already built.") + else: + logging.info("Step 2: Building NER dataset") + build(config) + + # Step 3: Train model + logging.info("Step 3: Training NER Model") + train(config) + + return 0 def main(): - parser = argparse.ArgumentParser(description="Train NER model for DRC names") + parser = argparse.ArgumentParser(description="NER model management for DRC names") parser.add_argument("--config", type=str, help="Path to configuration file") parser.add_argument("--env", type=str, default="development", help="Environment name") + parser.add_argument("--reset", action="store_true", help="Reset all steps") args = parser.parse_args() - sys.exit(train(config_path=args.config, env=args.env)) + try: + config = setup_config(config_path=args.config, env=args.env) + return run_pipeline(config, args.reset) + + except Exception as e: + print(f"Pipeline failed: {e}") + traceback.print_exc() + return 1 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/processing/batch/batch_processor.py b/processing/batch/batch_processor.py index 92cb103..e6773ed 100644 --- a/processing/batch/batch_processor.py +++ b/processing/batch/batch_processor.py @@ -5,6 +5,7 @@ from typing import Iterator import pandas as pd from processing.batch.batch_config import BatchConfig +from processing.batch.memory_monitor import MemoryMonitor from processing.steps import PipelineStep @@ -13,28 +14,36 @@ class BatchProcessor: def __init__(self, config: BatchConfig): self.config = config + self.memory_monitor = MemoryMonitor() def create_batches(self, df: pd.DataFrame) -> Iterator[tuple[pd.DataFrame, int]]: - """Create batches from DataFrame""" + """Create batches from DataFrame without unnecessary copies""" total_rows = len(df) batch_size = self.config.batch_size for i in range(0, total_rows, batch_size): - batch = df.iloc[i : i + batch_size].copy() + batch = df.iloc[i : i + batch_size] batch_id = i // batch_size yield batch, batch_id def process_sequential(self, step: PipelineStep, df: pd.DataFrame) -> pd.DataFrame: - """Process batches sequentially""" + """Memory-optimized sequential processing""" results = [] + memory_threshold_mb = 1000 # Clean memory when usage exceeds 1 GB - for batch, batch_id in self.create_batches(df): + for batch_num, (batch, batch_id) in enumerate(self.create_batches(df)): if step.batch_exists(batch_id): logging.info(f"Batch {batch_id} already processed, loading from checkpoint") processed_batch = step.load_batch(batch_id) else: try: - processed_batch = step.process_batch(batch, batch_id) + # Only copy if the processing step requires mutation + if step.requires_batch_mutation: + batch_copy = batch.copy() + processed_batch = step.process_batch(batch_copy, batch_id) + else: + processed_batch = step.process_batch(batch, batch_id) + step.save_batch(processed_batch, batch_id) step.state.processed_batches += 1 except Exception as e: @@ -44,14 +53,32 @@ class BatchProcessor: results.append(processed_batch) + # Memory management + if batch_num % self.config.checkpoint_interval == 0: + current_memory = self.memory_monitor.get_memory_usage_mb() + if current_memory > memory_threshold_mb: + logging.info(f"Memory cleanup triggered at {current_memory:.1f} MB") + self.memory_monitor.cleanup_memory() + # Save state periodically if batch_id % self.config.checkpoint_interval == 0: step.save_state() - return pd.concat(results, ignore_index=True) if results else pd.DataFrame() + # Final memory cleanup before concatenation + self.memory_monitor.cleanup_memory() + self.memory_monitor.log_memory_usage("before_concat") + + result = self._safe_concat(results) if results else pd.DataFrame() + + # Final cleanup + del results + self.memory_monitor.cleanup_memory() + self.memory_monitor.log_memory_usage("sequential_complete") + + return result def process_concurrent(self, step: PipelineStep, df: pd.DataFrame) -> pd.DataFrame: - """Process batches concurrently""" + """Memory-optimized concurrent processing""" executor_class = ( ProcessPoolExecutor if self.config.use_multiprocessing else ThreadPoolExecutor ) @@ -65,7 +92,9 @@ class BatchProcessor: logging.info(f"Batch {batch_id} already processed, loading from checkpoint") results[batch_id] = step.load_batch(batch_id) else: - future = executor.submit(step.process_batch, batch, batch_id) + # Only copy if necessary for concurrent processing + batch_copy = batch.copy() if step.requires_batch_mutation else batch + future = executor.submit(step.process_batch, batch_copy, batch_id) future_to_batch[future] = (batch_id, batch) # Collect results as they complete @@ -81,13 +110,24 @@ class BatchProcessor: logging.error(f"Failed to process batch {batch_id}: {e}") step.state.failed_batches.append(batch_id) - # Reassemble results in order + # Memory-efficient reassembly ordered_results = [] for batch_id in sorted(results.keys()): ordered_results.append(results[batch_id]) step.save_state() - return pd.concat(ordered_results, ignore_index=True) if ordered_results else pd.DataFrame() + + # Cleanup before concat + del results + self.memory_monitor.cleanup_memory() + + result = self._safe_concat(ordered_results) if ordered_results else pd.DataFrame() + + # Final cleanup + del ordered_results + self.memory_monitor.cleanup_memory() + + return result def process(self, step: PipelineStep, df: pd.DataFrame) -> pd.DataFrame: """Process data using the configured strategy""" @@ -95,8 +135,29 @@ class BatchProcessor: step.load_state() logging.info(f"Starting {step.name} with {step.state.total_batches} batches") + self.memory_monitor.log_memory_usage("process_start") if self.config.max_workers == 1: - return self.process_sequential(step, df) + result = self.process_sequential(step, df) else: - return self.process_concurrent(step, df) + result = self.process_concurrent(step, df) + + self.memory_monitor.log_memory_usage("process_complete") + return result + + def _safe_concat(self, dfs: list) -> pd.DataFrame: + """Memory-safe concatenation with monitoring""" + if not dfs: + return pd.DataFrame() + + memory = self.memory_monitor.get_memory_usage_mb() + logging.info(f"Starting concat of {len(dfs)} DataFrames at {memory:.1f} MB") + + # Use copy=False to avoid unnecessary copying during concat + result = pd.concat(dfs, ignore_index=True, copy=False) + + # Monitor memory after concat + memory = self.memory_monitor.get_memory_usage_mb() + logging.info(f"Concat complete. Memory: {memory:.1f} MB") + + return result diff --git a/processing/batch/memory_monitor.py b/processing/batch/memory_monitor.py new file mode 100644 index 0000000..ba57a38 --- /dev/null +++ b/processing/batch/memory_monitor.py @@ -0,0 +1,25 @@ +import gc +import logging + +import psutil + + +class MemoryMonitor: + """Monitor and manage memory usage during batch processing""" + + @staticmethod + def get_memory_usage_mb() -> float: + """Get current memory usage in MB""" + process = psutil.Process() + return process.memory_info().rss / 1024 / 1024 + + @staticmethod + def cleanup_memory(): + """Force garbage collection""" + gc.collect() + + @staticmethod + def log_memory_usage(step_name: str): + """Log current memory usage""" + memory_mb = MemoryMonitor.get_memory_usage_mb() + logging.info(f"Memory usage after {step_name}: {memory_mb:.1f} MB") diff --git a/processing/monitoring/data_analyzer.py b/processing/monitoring/data_analyzer.py deleted file mode 100644 index f3c22aa..0000000 --- a/processing/monitoring/data_analyzer.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging -from typing import Dict - -import pandas as pd - - -class DatasetAnalyzer: - """Analyze dataset statistics and quality""" - - def __init__(self, filepath: str): - self.filepath = filepath - self.df = None - - def load_data(self) -> bool: - """Load dataset for analysis""" - try: - self.df = pd.read_csv(self.filepath) - return True - except Exception as e: - logging.error(f"Failed to load {self.filepath}: {e}") - return False - - def analyze_completion(self) -> Dict: - """Analyze annotation completion status""" - if self.df is None: - return {} - - total_rows = len(self.df) - - # Check annotation status - if "annotated" in self.df.columns: - annotated_count = (self.df["annotated"] == 1).sum() - unannotated_count = (self.df["annotated"] == 0).sum() - else: - annotated_count = 0 - unannotated_count = total_rows - - # Analyze name completeness - complete_names = 0 - if "identified_name" in self.df.columns and "identified_surname" in self.df.columns: - complete_names = ( - (self.df["identified_name"].notna()) & (self.df["identified_surname"].notna()) - ).sum() - - return { - "total_rows": total_rows, - "annotated_rows": annotated_count, - "unannotated_rows": unannotated_count, - "annotation_percentage": (annotated_count / total_rows * 100) if total_rows > 0 else 0, - "complete_names": complete_names, - "completeness_percentage": (complete_names / total_rows * 100) if total_rows > 0 else 0, - } diff --git a/processing/monitoring/pipeline_monitor.py b/processing/monitoring/pipeline_monitor.py index 92a7488..12cd3b6 100644 --- a/processing/monitoring/pipeline_monitor.py +++ b/processing/monitoring/pipeline_monitor.py @@ -19,7 +19,13 @@ class PipelineMonitor: self.paths = paths self.checkpoint_dir = paths.checkpoints_dir - self.steps = ["data_cleaning", "feature_extraction", "ner_annotation", "llm_annotation", "data_splitting"] + self.steps = [ + "data_cleaning", + "feature_extraction", + "ner_annotation", + "llm_annotation", + "data_splitting", + ] def get_step_status(self, step_name: str) -> Dict: """Get status of a specific pipeline step""" diff --git a/processing/ner/formats/__init__.py b/processing/ner/formats/__init__.py index 03978f6..d2a39e8 100644 --- a/processing/ner/formats/__init__.py +++ b/processing/ner/formats/__init__.py @@ -13,10 +13,17 @@ class BaseNameFormatter(ABC): """ def __init__(self, connectors: List[str] = None, additional_surnames: List[str] = None): - self.connectors = connectors or ['wa', 'ya', 'ka', 'ba'] + self.connectors = connectors or ["wa", "ya", "ka", "ba"] self.additional_surnames = additional_surnames or [ - 'jean', 'paul', 'marie', 'joseph', 'pierre', 'claude', - 'andre', 'michel', 'robert' + "jean", + "paul", + "marie", + "joseph", + "pierre", + "claude", + "andre", + "michel", + "robert", ] @classmethod @@ -26,7 +33,9 @@ class BaseNameFormatter(ABC): return [] return native_str.strip().split() - def create_ner_tags(self, text: str, native_parts: List[str], surname: str) -> List[Tuple[int, int, str]]: + def create_ner_tags( + self, text: str, native_parts: List[str], surname: str + ) -> List[Tuple[int, int, str]]: """Create NER entity tags for transformed text""" entities = [] current_pos = 0 @@ -38,15 +47,15 @@ class BaseNameFormatter(ABC): # Determine tag based on word content if word in native_parts or any(connector in word for connector in self.connectors): - tag = 'NATIVE' + tag = "NATIVE" elif word == surname or word in self.additional_surnames: - tag = 'SURNAME' + tag = "SURNAME" else: # Check if it's a compound native word or new surname if any(part in word for part in native_parts): - tag = 'NATIVE' + tag = "NATIVE" else: - tag = 'SURNAME' + tag = "SURNAME" entities.append((start_pos, end_pos, tag)) current_pos = end_pos + 1 # +1 for space @@ -54,15 +63,17 @@ class BaseNameFormatter(ABC): return entities @classmethod - def compute_derived_attributes(cls, name: str) -> Dict: + def compute_numeric_features(cls, name: str) -> Dict: """Compute all derived attributes for the transformed name""" words_count = len(name.split()) if name else 0 length = len(name) if name else 0 return { - 'words': words_count, - 'length': length, - 'identified_category': NameCategory.SIMPLE if words_count == 3 else NameCategory.COMPOSE, + "words": words_count, + "length": length, + "identified_category": ( + NameCategory.SIMPLE.value if words_count == 3 else NameCategory.COMPOSE.value + ), } @abstractmethod diff --git a/processing/ner/formats/connectors_format.py b/processing/ner/formats/connectors_format.py index 9996378..66a6795 100644 --- a/processing/ner/formats/connectors_format.py +++ b/processing/ner/formats/connectors_format.py @@ -8,8 +8,8 @@ from processing.ner.formats import BaseNameFormatter class ConnectorFormatter(BaseNameFormatter): def transform(self, row: pd.Series) -> Dict: - native_parts = self.parse_native_components(row['probable_native']) - surname = row['probable_surname'] if pd.notna(row['probable_surname']) else '' + native_parts = self.parse_native_components(row["probable_native"]) + surname = row["probable_surname"] if pd.notna(row["probable_surname"]) else "" connector = random.choice(self.connectors) # Connect native parts with a random connector @@ -17,20 +17,22 @@ class ConnectorFormatter(BaseNameFormatter): connected_native = f" {connector} ".join(native_parts) full_name = f"{connected_native} {surname}".strip() else: - connected_native = f"{row['probable_native']} {connector} {row['probable_native']}".strip() + connected_native = ( + f"{row['probable_native']} {connector} {row['probable_native']}".strip() + ) full_name = f"{connected_native} {surname}".strip() return { - 'name': full_name, - 'probable_native': connected_native, - 'identify_name': connected_native, - 'probable_surname': surname, - 'identify_surname': surname, - 'ner_entities': str(self.create_ner_tags(full_name, native_parts, surname)), - 'transformation_type': self.transformation_type, - **self.compute_derived_attributes(full_name) + "name": full_name, + "probable_native": connected_native, + "identified_name": connected_native, + "probable_surname": surname, + "identified_surname": surname, + "ner_entities": str(self.create_ner_tags(full_name, native_parts, surname)), + "transformation_type": self.transformation_type, + **self.compute_numeric_features(full_name), } @property def transformation_type(self) -> str: - return 'connector_added' + return "connector_added" diff --git a/processing/ner/formats/extended_surname_format.py b/processing/ner/formats/extended_surname_format.py index ab3c2c2..cc94388 100644 --- a/processing/ner/formats/extended_surname_format.py +++ b/processing/ner/formats/extended_surname_format.py @@ -8,8 +8,8 @@ from processing.ner.formats import BaseNameFormatter class ExtendedSurnameFormatter(BaseNameFormatter): def transform(self, row: pd.Series) -> Dict: - native_parts = self.parse_native_components(row['probable_native']) - original_surname = row['probable_surname'] if pd.notna(row['probable_surname']) else '' + native_parts = self.parse_native_components(row["probable_native"]) + original_surname = row["probable_surname"] if pd.notna(row["probable_surname"]) else "" # Add random additional surname additional_surname = random.choice(self.additional_surnames) @@ -17,16 +17,16 @@ class ExtendedSurnameFormatter(BaseNameFormatter): full_name = f"{row['probable_native']} {combined_surname}".strip() return { - 'name': full_name, - 'probable_native': row['probable_native'], - 'identify_name': row['probable_native'], - 'probable_surname': combined_surname, - 'identity_surname': combined_surname, - 'ner_entities': str(self.create_ner_tags(full_name, native_parts, combined_surname)), - 'transformation_type': self.transformation_type, - **self.compute_derived_attributes(full_name) + "name": full_name, + "probable_native": row["probable_native"], + "identified_name": row["probable_native"], + "probable_surname": combined_surname, + "identified_surname": combined_surname, + "ner_entities": str(self.create_ner_tags(full_name, native_parts, combined_surname)), + "transformation_type": self.transformation_type, + **self.compute_numeric_features(full_name), } @property def transformation_type(self) -> str: - return 'extended_surname' + return "extended_surname" diff --git a/processing/ner/formats/native_only_format.py b/processing/ner/formats/native_only_format.py index 482928e..8e98163 100644 --- a/processing/ner/formats/native_only_format.py +++ b/processing/ner/formats/native_only_format.py @@ -7,22 +7,22 @@ from processing.ner.formats import BaseNameFormatter class NativeOnlyFormatter(BaseNameFormatter): def transform(self, row: pd.Series) -> Dict: - native_parts = self.parse_native_components(row['probable_native']) + native_parts = self.parse_native_components(row["probable_native"]) # Only native components - full_name = row['probable_native'] + full_name = row["probable_native"] return { - 'name': full_name, - 'probable_native': row['probable_native'], - 'identify_name': row['probable_native'], - 'probable_surname': '', - 'identify_surname': '', - 'ner_entities': str(self.create_ner_tags(full_name, native_parts, '')), - 'transformation_type': self.transformation_type, - **self.compute_derived_attributes(full_name) + "name": full_name, + "probable_native": row["probable_native"], + "identified_name": row["probable_native"], + "probable_surname": "", + "identified_surname": "", + "ner_entities": str(self.create_ner_tags(full_name, native_parts, "")), + "transformation_type": self.transformation_type, + **self.compute_numeric_features(full_name), } @property def transformation_type(self) -> str: - return 'native_only' + return "native_only" diff --git a/processing/ner/formats/original_format.py b/processing/ner/formats/original_format.py index 11d77f8..ffcced9 100644 --- a/processing/ner/formats/original_format.py +++ b/processing/ner/formats/original_format.py @@ -7,23 +7,23 @@ from processing.ner.formats import BaseNameFormatter class OriginalFormatter(BaseNameFormatter): def transform(self, row: pd.Series) -> Dict: - native_parts = self.parse_native_components(row['probable_native']) - surname = row['probable_surname'] if pd.notna(row['probable_surname']) else '' + native_parts = self.parse_native_components(row["probable_native"]) + surname = row["probable_surname"] if pd.notna(row["probable_surname"]) else "" # Keep original order: native components + surname full_name = f"{row['probable_native']} {surname}".strip() return { - 'name': full_name, - 'probable_native': row['probable_native'], - 'identify_name': row['probable_native'], - 'probable_surname': surname, - 'identify_surname': surname, - 'ner_entities': str(self.create_ner_tags(full_name, native_parts, surname)), - 'transformation_type': self.transformation_type, - **self.compute_derived_attributes(full_name) + "name": full_name, + "probable_native": row["probable_native"], + "identified_name": row["probable_native"], + "probable_surname": surname, + "identified_surname": surname, + "ner_entities": str(self.create_ner_tags(full_name, native_parts, surname)), + "transformation_type": self.transformation_type, + **self.compute_numeric_features(full_name), } @property def transformation_type(self) -> str: - return 'original' + return "original" diff --git a/processing/ner/formats/position_flipped_format.py b/processing/ner/formats/position_flipped_format.py index 04cfad6..9702640 100644 --- a/processing/ner/formats/position_flipped_format.py +++ b/processing/ner/formats/position_flipped_format.py @@ -7,23 +7,23 @@ from processing.ner.formats import BaseNameFormatter class PositionFlippedFormatter(BaseNameFormatter): def transform(self, row: pd.Series) -> Dict: - native_parts = self.parse_native_components(row['probable_native']) - surname = row['probable_surname'] if pd.notna(row['probable_surname']) else '' + native_parts = self.parse_native_components(row["probable_native"]) + surname = row["probable_surname"] if pd.notna(row["probable_surname"]) else "" # Flip order: surname + native components full_name = f"{surname} {row['probable_native']}".strip() return { - 'name': full_name, - 'probable_native': row['probable_native'], - 'identify_name': row['probable_native'], - 'probable_surname': surname, - 'identify_surname': surname, - 'ner_entities': str(self.create_ner_tags(full_name, native_parts, surname)), - 'transformation_type': self.transformation_type, - **self.compute_derived_attributes(full_name) + "name": full_name, + "probable_native": row["probable_native"], + "identified_name": row["probable_native"], + "probable_surname": surname, + "identified_surname": surname, + "ner_entities": str(self.create_ner_tags(full_name, native_parts, surname)), + "transformation_type": self.transformation_type, + **self.compute_numeric_features(full_name), } @property def transformation_type(self) -> str: - return 'position_flipped' + return "position_flipped" diff --git a/processing/ner/formats/reduced_native_format.py b/processing/ner/formats/reduced_native_format.py index 188cfef..2f992f1 100644 --- a/processing/ner/formats/reduced_native_format.py +++ b/processing/ner/formats/reduced_native_format.py @@ -7,24 +7,24 @@ from processing.ner.formats import BaseNameFormatter class ReducedNativeFormatter(BaseNameFormatter): def transform(self, row: pd.Series) -> Dict: - native_parts = self.parse_native_components(row['probable_native']) - surname = row['probable_surname'] if pd.notna(row['probable_surname']) else '' + native_parts = self.parse_native_components(row["probable_native"]) + surname = row["probable_surname"] if pd.notna(row["probable_surname"]) else "" # Keep only first native component + surname - reduced_native = native_parts[0] if len(native_parts) > 1 else row['probable_native'] + reduced_native = native_parts[0] if len(native_parts) > 1 else row["probable_native"] full_name = f"{reduced_native} {surname}".strip() return { - 'name': full_name, - 'probable_native': reduced_native, - 'identify_name': reduced_native, - 'probable_surname': surname, - 'identify_surname': surname, - 'ner_entities': str(self.create_ner_tags(full_name, [reduced_native], surname)), - 'transformation_type': self.transformation_type, - **self.compute_derived_attributes(full_name) + "name": full_name, + "probable_native": reduced_native, + "identified_name": reduced_native, + "probable_surname": surname, + "identified_surname": surname, + "ner_entities": str(self.create_ner_tags(full_name, [reduced_native], surname)), + "transformation_type": self.transformation_type, + **self.compute_numeric_features(full_name), } @property def transformation_type(self) -> str: - return 'reduced_native' + return "reduced_native" diff --git a/processing/ner/ner_data_builder.py b/processing/ner/ner_data_builder.py index 5ce23b3..86022bb 100644 --- a/processing/ner/ner_data_builder.py +++ b/processing/ner/ner_data_builder.py @@ -10,189 +10,143 @@ from spacy.util import filter_spans from core.config import PipelineConfig from core.utils import get_data_file_path +from core.utils.data_loader import DataLoader class NERDataBuilder: def __init__(self, config: PipelineConfig): self.config = config + self.data_loader = DataLoader(config) - @classmethod - def parse_entities(cls, entities_str): - """Parse entity string (tuple format or JSON) into spaCy-style tuples.""" - if not entities_str or entities_str in ["[]", "", "nan"]: - return [] + @staticmethod + def _parse_entities(series: pd.Series) -> pd.Series: + """Vectorized parse of entity strings.""" - entities_str = str(entities_str).strip() - - # Handle different formats - try: - # Try to parse as Python literal (tuples or lists) - if entities_str.startswith("[(") and entities_str.endswith(")]"): - # Standard tuple format: [(0, 6, 'NATIVE'), ...] - return ast.literal_eval(entities_str) - elif entities_str.startswith("[[") and entities_str.endswith("]]"): - # Nested list format: [[0, 6, 'NATIVE'], ...] - nested_list = ast.literal_eval(entities_str) - return [(start, end, label) for start, end, label in nested_list] - elif entities_str.startswith("[{") and entities_str.endswith("}]"): - # JSON format: [{"start": 0, "end": 6, "label": "NATIVE"}, ...] - json_entities = json.loads(entities_str) - return [(e["start"], e["end"], e["label"]) for e in json_entities] - else: - # Try general ast.literal_eval for other formats - parsed = ast.literal_eval(entities_str) - if isinstance(parsed, list): - # Convert any list format to tuples - result = [] - for item in parsed: - if isinstance(item, (list, tuple)) and len(item) == 3: - result.append((item[0], item[1], item[2])) - return result - - except (ValueError, SyntaxError, json.JSONDecodeError) as e: - logging.warning(f"Failed to parse entities: {entities_str} ({e})") - return [] - - logging.warning(f"Unknown entity format: {entities_str}") - return [] - - @classmethod - def validate_entities(cls, entities, text): - """Validate and sort entity tuples, removing overlaps and invalid spans.""" - if not entities or not text: - return [] - - text = str(text).strip() - if not text: - return [] - - # Filter out invalid entities - valid_entities = [] - for entity in entities: - if not isinstance(entity, (list, tuple)) or len(entity) != 3: - logging.warning(f"Invalid entity format: {entity}") - continue - - start, end, label = entity - - # Ensure start/end are integers + def _parse(entities_str): + if not entities_str or entities_str in ["[]", "", "nan"]: + return [] + entities_str = str(entities_str).strip() try: - start = int(start) - end = int(end) - except (ValueError, TypeError): - logging.warning(f"Invalid start/end positions: {entity}") - continue + if entities_str.startswith("[(") and entities_str.endswith(")]"): + return ast.literal_eval(entities_str) + elif entities_str.startswith("[[") and entities_str.endswith("]]"): + return [tuple(e) for e in ast.literal_eval(entities_str)] + elif entities_str.startswith("[{") and entities_str.endswith("}]"): + return [(e["start"], e["end"], e["label"]) for e in json.loads(entities_str)] + else: + parsed = ast.literal_eval(entities_str) + return [ + tuple(e) for e in parsed if isinstance(e, (list, tuple)) and len(e) == 3 + ] + except (ValueError, SyntaxError, json.JSONDecodeError): + return [] - # Ensure label is string - if not isinstance(label, str): - logging.warning(f"Invalid label type: {entity}") - continue + return series.map(_parse) - # Check bounds - if not (0 <= start < end <= len(text)): - logging.warning(f"Entity span out of bounds: {entity} for text '{text}' (length {len(text)})") - continue + @staticmethod + def _validate_entities(texts: pd.Series, entities_series: pd.Series) -> pd.Series: + """Vectorized entity validation.""" - # Check that span contains actual text - span_text = text[start:end].strip() - if not span_text: - logging.warning(f"Empty span: {entity} in text '{text}'") - continue - - valid_entities.append((start, end, label)) - - if not valid_entities: - return [] - - # Sort by start position - valid_entities.sort(key=lambda x: (x[0], x[1])) - - # Remove overlapping entities (keep the first one) - filtered = [] - for start, end, label in valid_entities: - # Check for overlap with already added entities - has_overlap = False - for e_start, e_end, _ in filtered: - if not (end <= e_start or start >= e_end): - has_overlap = True - logging.warning( - f"Removing overlapping entity ({start}, {end}, '{label}') " - f"conflicts with ({e_start}, {e_end}) in '{text}'" - ) - break - - if not has_overlap: - filtered.append((start, end, label)) - - return filtered - - @classmethod - def create_doc(cls, text, entities, nlp): - """Create a spaCy Doc object with entities added.""" - doc = nlp(text) - ents = [] - - for start, end, label in entities: - span = doc.char_span(start, end, label=label, alignment_mode="contract") \ - or doc.char_span(start, end, label=label, alignment_mode="strict") - if span: - ents.append(span) - else: - logging.warning(f"Could not create span ({start}, {end}, '{label}') in '{text}'") - - doc.ents = filter_spans(ents) if ents else [] - return doc - - def build(self, data: pd.DataFrame = None) -> int: - """Build the dataset for NER training.""" - logging.info("Building dataset for NER training") - try: - df = pd.read_csv(get_data_file_path("names_featured.csv", self.config)) \ - if data is None \ - else data - - ner_df = df[df["ner_tagged"] == 1].copy() - if ner_df.empty: - logging.error("No NER tagged data found in the CSV") - return 1 - - logging.info(f"Found {len(ner_df)} NER tagged entries") - nlp = spacy.blank("fr") - doc_bin, training_data = DocBin(), [] - processed_count, skipped_count = 0, 0 - - for _, row in ner_df.iterrows(): - text = str(row.get("name", "")).strip() - if not text: + def _validate(text, entities): + if not entities or not text: + return [] + text = str(text).strip() + valid = [] + for ent in entities: + if not isinstance(ent, (list, tuple)) or len(ent) != 3: continue - - entities = self.parse_entities(row.get("ner_entities", "[]")) - entities = self.validate_entities(entities, text) - - training_data.append((text, {"entities": entities})) + start, end, label = ent try: - doc_bin.add(self.create_doc(text, entities, nlp)) - processed_count += 1 - except Exception as e: - logging.error(f"Error processing '{text}': {e}") - skipped_count += 1 + start, end = int(start), int(end) + except (ValueError, TypeError): + continue + if not isinstance(label, str): + continue + if not (0 <= start < end <= len(text)): + continue + if not text[start:end].strip(): + continue + valid.append((start, end, label)) + if not valid: + return [] + valid.sort(key=lambda x: (x[0], x[1])) + # remove overlaps + filtered, last_end = [], -1 + for s, e, l in valid: + if s >= last_end: + filtered.append((s, e, l)) + last_end = e + return filtered - if not training_data: - logging.error("No valid training examples generated") - return 1 + return pd.Series(map(_validate, texts, entities_series), index=texts.index) - json_path = Path(self.config.paths.data_dir) / self.config.data.output_files["ner_data"] - spacy_path = Path(self.config.paths.data_dir) / self.config.data.output_files["ner_spacy"] + @staticmethod + def _create_docs(nlp, texts, entities): + """Batch create spaCy Docs.""" + docs = [] + for text, ents in zip(texts, entities): + doc = nlp(text) + spans = [] + for start, end, label in ents: + span = doc.char_span( + start, end, label=label, alignment_mode="contract" + ) or doc.char_span(start, end, label=label, alignment_mode="strict") + if span: + spans.append(span) + doc.ents = filter_spans(spans) + docs.append(doc) + return docs - with open(json_path, "w", encoding="utf-8") as f: - json.dump(training_data, f, ensure_ascii=False, indent=None) - doc_bin.to_disk(spacy_path) + def build(self) -> int: + input_filepath = get_data_file_path( + self.config.data.output_files["engineered"], self.config + ) + df = self.data_loader.load_csv_complete(input_filepath) + df = df[["name", "ner_tagged", "ner_entities"]] - logging.info(f"Processed: {processed_count}, Skipped: {skipped_count}") - logging.info(f"Saved NER data in json format to {json_path}") - logging.info(f"Saved NER data in spaCy format to {spacy_path}") - return 0 - - except Exception as e: - logging.error(f"Failed to build NER dataset: {e}", exc_info=True) + # Filter early + ner_df = df.loc[df["ner_tagged"] == 1, ["name", "ner_entities"]] + if ner_df.empty: + logging.error("No NER tagged data found") return 1 + + total_rows = len(df) + del df # No need to keep in memory + + logging.info(f"Found {len(ner_df)} NER tagged entries") + nlp = spacy.blank("fr") + + # Vectorized parsing + validation + parsed_entities = self._parse_entities(ner_df["ner_entities"]) + validated_entities = self._validate_entities(ner_df["name"], parsed_entities) + + # Drop rows with no valid entities + mask = validated_entities.map(bool) + ner_df = ner_df.loc[mask] + validated_entities = validated_entities.loc[mask] + + if ner_df.empty: + logging.error("No valid training examples after validation") + return 1 + + # Prepare training data + training_data = list( + zip(ner_df["name"].tolist(), [{"entities": ents} for ents in validated_entities]) + ) + + # Create spaCy DocBin in batch + docs = self._create_docs(nlp, ner_df["name"].tolist(), validated_entities.tolist()) + doc_bin = DocBin(docs=docs) + + # Save + json_path = Path(self.config.paths.data_dir) / self.config.data.output_files["ner_data"] + spacy_path = Path(self.config.paths.data_dir) / self.config.data.output_files["ner_spacy"] + + with open(json_path, "w", encoding="utf-8") as f: + json.dump(training_data, f, ensure_ascii=False, separators=(",", ":")) + doc_bin.to_disk(spacy_path) + + logging.info(f"Processed: {len(training_data)}, Skipped: {total_rows - len(training_data)}") + logging.info(f"Saved NER JSON to {json_path}") + logging.info(f"Saved NER spacy to {spacy_path}") + return 0 diff --git a/processing/ner/ner_engineering.py b/processing/ner/ner_engineering.py index 422b86b..398ab23 100644 --- a/processing/ner/ner_engineering.py +++ b/processing/ner/ner_engineering.py @@ -1,9 +1,14 @@ import random from typing import List +import logging import numpy as np import pandas as pd +from tqdm import tqdm +from core.config import PipelineConfig +from core.utils import get_data_file_path +from core.utils.data_loader import OPTIMIZED_DTYPES, DataLoader from processing.ner.formats.connectors_format import ConnectorFormatter from processing.ner.formats.extended_surname_format import ExtendedSurnameFormatter from processing.ner.formats.native_only_format import NativeOnlyFormatter @@ -18,50 +23,64 @@ class NEREngineering: and encourage sequence characteristic learning. """ - def __init__(self, connectors: List[str] = None, additional_surnames: List[str] = None): - self.connectors = connectors or ['wa', 'ya', 'ka', 'ba', 'la'] - self.additional_surnames = additional_surnames or [ - 'jean', 'paul', 'marie', 'joseph', 'pierre', 'claude', - 'andre', 'michel', 'robert' + def __init__(self, config: PipelineConfig): + self.config = config + self.data_loader = DataLoader(config) + self.connectors = ["wa", "ya", "ka", "ba", "la"] + self.additional_surnames = [ + "jean", + "paul", + "marie", + "joseph", + "pierre", + "claude", + "andre", + "michel", + "robert", ] + random.seed(self.config.data.random_seed) + np.random.seed(self.config.data.random_seed) + # Initialize format classes self.formatters = { - 'original': OriginalFormatter(self.connectors, self.additional_surnames), - 'native_only': NativeOnlyFormatter(self.connectors, self.additional_surnames), - 'position_flipped': PositionFlippedFormatter(self.connectors, self.additional_surnames), - 'reduced_native': ReducedNativeFormatter(self.connectors, self.additional_surnames), - 'connector_added': ConnectorFormatter(self.connectors, self.additional_surnames), - 'extended_surname': ExtendedSurnameFormatter(self.connectors, self.additional_surnames) + "original": OriginalFormatter(self.connectors, self.additional_surnames), + "native_only": NativeOnlyFormatter(self.connectors, self.additional_surnames), + "position_flipped": PositionFlippedFormatter(self.connectors, self.additional_surnames), + "reduced_native": ReducedNativeFormatter(self.connectors, self.additional_surnames), + "connector_added": ConnectorFormatter(self.connectors, self.additional_surnames), + "extended_surname": ExtendedSurnameFormatter(self.connectors, self.additional_surnames), } - @classmethod - def load_ner_data(cls, filepath: str) -> pd.DataFrame: + def load_data(self) -> pd.DataFrame: """Load and filter NER-tagged data from CSV file""" - df = pd.read_csv(filepath) + + filepath = get_data_file_path(self.config.data.output_files["featured"], self.config) + df = self.data_loader.load_csv_complete(filepath) # Filter only NER-tagged rows - ner_data = df[df['ner_tagged'] == 1].copy() - print(f"Loaded {len(ner_data)} NER-tagged records from {len(df)} total records") + ner_data = df[df["ner_tagged"] == 1].copy() + logging.info(f"Loaded {len(ner_data)} NER-tagged records from {len(df)} total records") return ner_data - def engineer_dataset(self, df: pd.DataFrame, random_seed: int = 42) -> pd.DataFrame: - """ - Apply feature engineering transformations according to the specified rules: - - First 25%: original format - - Second 25%: remove surname - - Third 25%: flip positions - - Fourth 10%: reduce native components - - Fifth 10%: add connectors - - Last 5%: extend surnames - """ - random.seed(random_seed) - np.random.seed(random_seed) + def compute(self) -> None: + logging.info("Applying feature engineering transformations...") + input_filepath = get_data_file_path(self.config.data.output_files["featured"], self.config) + output_filepath = get_data_file_path( + self.config.data.output_files["engineered"], self.config + ) - # Shuffle the dataset - df_shuffled = df.sample(frac=1, random_state=random_seed).reset_index(drop=True) - total_rows = len(df_shuffled) + df = self.data_loader.load_csv_complete(input_filepath) + ner_df = df[df["ner_tagged"] == 1].copy() + logging.info(f"Loaded {len(ner_df)} NER-tagged records from {len(df)} total records") + + del df # No need to keep in memory + + ner_df = ner_df.sample(frac=1, random_state=self.config.data.random_seed).reset_index( + drop=True + ) + total_rows = len(ner_df) # Calculate split points split_25_1 = int(total_rows * 0.25) @@ -71,37 +90,31 @@ class NEREngineering: split_10_2 = int(total_rows * 0.95) # Define transformation groups - transformation_groups = [ - (0, split_25_1, 'original'), - (split_25_1, split_25_2, 'native_only'), - (split_25_2, split_25_3, 'position_flipped'), - (split_25_3, split_10_1, 'reduced_native'), - (split_10_1, split_10_2, 'connector_added'), - (split_10_2, total_rows, 'extended_surname') + groups = [ + (0, split_25_1, "original"), # First 25%: original format + (split_25_1, split_25_2, "native_only"), # Second 25%: remove surname + (split_25_2, split_25_3, "position_flipped"), # Third 25%: flip positions + (split_25_3, split_10_1, "reduced_native"), # Fourth 10%: reduce native components + (split_10_1, split_10_2, "connector_added"), # Fifth 10%: add connectors + (split_10_2, total_rows, "extended_surname"), # Last 5%: extend surnames ] - print("Dataset splits:") - for start, end, trans_type in transformation_groups: - print(f"Group {trans_type}: {start} to {end} ({end - start} rows)") + for start, end, trans_type in groups: + logging.info(f"Group {trans_type}: {start} to {end} ({end - start} rows)") # Process each group - engineered_rows = [] - for start, end, formatter_key in transformation_groups: + rows = [] + for start, end, formatter_key in groups: formatter = self.formatters[formatter_key] - for idx in range(start, end): - row = df_shuffled.iloc[idx] + for idx in tqdm(range(start, end), desc=f"Processing {formatter_key}"): + row = ner_df.iloc[idx] transformed = formatter.transform(row) # Keep original columns and add transformed ones new_row = row.to_dict() new_row.update(transformed) - engineered_rows.append(new_row) + rows.append(new_row) - return pd.DataFrame(engineered_rows) - - @classmethod - def save_engineered_dataset(cls, df: pd.DataFrame, output_path: str): - """Save the engineered dataset to CSV file""" - df.to_csv(output_path, index=False) - print(f"Engineered dataset saved to {output_path}") + self.data_loader.save_csv(pd.DataFrame(rows), output_filepath) + logging.info(f"Engineered dataset saved to {output_filepath}") diff --git a/processing/ner/ner_name_model.py b/processing/ner/ner_name_model.py index 9537910..ab2aa01 100644 --- a/processing/ner/ner_name_model.py +++ b/processing/ner/ner_name_model.py @@ -48,7 +48,7 @@ class NERNameModel: logging.info(f"Loading training data from {data_path}") - with open(data_path, 'r', encoding='utf-8') as f: + with open(data_path, "r", encoding="utf-8") as f: raw_data = json.load(f) # Validate and clean training data @@ -58,7 +58,9 @@ class NERNameModel: for i, item in enumerate(raw_data): try: if not isinstance(item, (list, tuple)) or len(item) != 2: - logging.warning(f"Skipping invalid training example format at index {i}: {item}") + logging.warning( + f"Skipping invalid training example format at index {i}: {item}" + ) skipped_count += 1 continue @@ -83,20 +85,27 @@ class NERNameModel: # String format from tagger: "[(0, 6, 'NATIVE'), ...]" try: import ast + entities = ast.literal_eval(entities_raw) if not isinstance(entities, list): - logging.warning(f"Parsed entities is not a list at index {i}: {entities}") + logging.warning( + f"Parsed entities is not a list at index {i}: {entities}" + ) skipped_count += 1 continue except (ValueError, SyntaxError) as e: - logging.warning(f"Failed to parse entity string at index {i}: {entities_raw} ({e})") + logging.warning( + f"Failed to parse entity string at index {i}: {entities_raw} ({e})" + ) skipped_count += 1 continue elif isinstance(entities_raw, list): # Already in list format entities = entities_raw else: - logging.warning(f"Skipping invalid entities format at index {i}: {entities_raw}") + logging.warning( + f"Skipping invalid entities format at index {i}: {entities_raw}" + ) skipped_count += 1 continue @@ -110,16 +119,20 @@ class NERNameModel: start, end, label = entity # Validate entity components - if (not isinstance(start, int) or not isinstance(end, int) or - not isinstance(label, str) or start >= end or - start < 0 or end > len(text)): + if ( + not isinstance(start, int) + or not isinstance(end, int) + or not isinstance(label, str) + or start >= end + or start < 0 + or end > len(text) + ): logging.warning(f"Skipping invalid entity bounds in '{text}': {entity}") continue # Check for overlaps with already validated entities has_overlap = any( - start < v_end and end > v_start - for v_start, v_end, _ in valid_entities + start < v_end and end > v_start for v_start, v_end, _ in valid_entities ) if has_overlap: @@ -128,8 +141,10 @@ class NERNameModel: # Validate that the span doesn't contain spaces (matching tagger validation) span_text = text[start:end] - if not span_text or span_text != span_text.strip() or ' ' in span_text: - logging.warning(f"Skipping entity with spaces in '{text}': {entity} -> '{span_text}'") + if not span_text or span_text != span_text.strip() or " " in span_text: + logging.warning( + f"Skipping entity with spaces in '{text}': {entity} -> '{span_text}'" + ) continue valid_entities.append((start, end, label)) @@ -148,7 +163,9 @@ class NERNameModel: skipped_count += 1 continue - logging.info(f"Loaded {len(valid_data)} valid training examples, skipped {skipped_count} invalid ones") + logging.info( + f"Loaded {len(valid_data)} valid training examples, skipped {skipped_count} invalid ones" + ) if not valid_data: raise ValueError("No valid training examples found in the data") @@ -156,15 +173,17 @@ class NERNameModel: return valid_data def train( - self, - data: List[Tuple[str, Dict]], - epochs: int = 5, - batch_size: int = 16, - dropout_rate: float = 0.2, + self, + data: List[Tuple[str, Dict]], + epochs: int = 5, + batch_size: int = 16, + dropout_rate: float = 0.2, ) -> None: """Train the NER model""" logging.info(f"Starting NER training with {len(data)} examples") - logging.info(f"Training parameters: epochs={epochs}, batch_size={batch_size}, dropout={dropout_rate}") + logging.info( + f"Training parameters: epochs={epochs}, batch_size={batch_size}, dropout={dropout_rate}" + ) if self.nlp is None: raise ValueError("Model not initialized. Call create_blank_model() first.") @@ -184,16 +203,15 @@ class NERNameModel: doc = self.nlp.make_doc(text) example = Example.from_dict(doc, annotations) examples.append(example) - logging.info(f"Training example: {text[:30]}... with entities {annotations.get('entities', [])}") + logging.info( + f"Training example: {text[:30]}... with entities {annotations.get('entities', [])}" + ) # Train in batches batches = minibatch(examples, size=batch_size) for batch in batches: self.nlp.update( - batch, - losses=losses, - drop=dropout_rate, - sgd=self.nlp.create_optimizer() + batch, losses=losses, drop=dropout_rate, sgd=self.nlp.create_optimizer() ) logging.info(f"Training batch with {len(batch)} examples, current losses: {losses}") @@ -208,7 +226,7 @@ class NERNameModel: "training_examples": len(data), "loss_history": losses_history, "batch_size": batch_size, - "dropout_rate": dropout_rate + "dropout_rate": dropout_rate, } logging.info(f"Training completed. Final loss: {self.training_stats['final_loss']:.4f}") @@ -225,7 +243,10 @@ class NERNameModel: predicted_entities = 0 actual_entities = 0 - entity_stats = {"NATIVE": {"tp": 0, "fp": 0, "fn": 0}, "SURNAME": {"tp": 0, "fp": 0, "fn": 0}} + entity_stats = { + "NATIVE": {"tp": 0, "fp": 0, "fn": 0}, + "SURNAME": {"tp": 0, "fp": 0, "fn": 0}, + } for text, annotations in test_data: # Get actual entities @@ -259,7 +280,9 @@ class NERNameModel: # Calculate overall metrics precision = correct_entities / predicted_entities if predicted_entities > 0 else 0 recall = correct_entities / actual_entities if actual_entities > 0 else 0 - f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 + f1_score = ( + 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 + ) # Calculate per-label metrics label_metrics = {} @@ -268,14 +291,16 @@ class NERNameModel: label_precision = tp / (tp + fp) if (tp + fp) > 0 else 0 label_recall = tp / (tp + fn) if (tp + fn) > 0 else 0 label_f1 = ( - 2 * (label_precision * label_recall) / (label_precision + label_recall)) \ - if (label_precision + label_recall) > 0 else 0 + (2 * (label_precision * label_recall) / (label_precision + label_recall)) + if (label_precision + label_recall) > 0 + else 0 + ) label_metrics[label] = { "precision": label_precision, "recall": label_recall, "f1_score": label_f1, - "support": tp + fn + "support": tp + fn, } evaluation_results = { @@ -286,9 +311,9 @@ class NERNameModel: "total_examples": total_examples, "correct_entities": correct_entities, "predicted_entities": predicted_entities, - "actual_entities": actual_entities + "actual_entities": actual_entities, }, - "by_label": label_metrics + "by_label": label_metrics, } logging.info(f"NER Evaluation completed. Overall F1: {f1_score:.4f}") @@ -309,7 +334,7 @@ class NERNameModel: # Save training statistics stats_path = model_dir / "training_stats.json" - with open(stats_path, 'w', encoding='utf-8') as f: + with open(stats_path, "w", encoding="utf-8") as f: json.dump(self.training_stats, f, indent=2) logging.info(f"NER Model saved to {model_dir}") @@ -328,7 +353,7 @@ class NERNameModel: # Load training statistics if available stats_path = Path(model_path) / "training_stats.json" if stats_path.exists(): - with open(stats_path, 'r', encoding='utf-8') as f: + with open(stats_path, "r", encoding="utf-8") as f: self.training_stats = json.load(f) logging.info("NER Model loaded successfully") @@ -342,15 +367,14 @@ class NERNameModel: entities = [] for ent in doc.ents: - entities.append({ - "text": ent.text, - "label": ent.label_, - "start": ent.start_char, - "end": ent.end_char, - "confidence": getattr(ent, 'score', None) # If confidence scores are available - }) + entities.append( + { + "text": ent.text, + "label": ent.label_, + "start": ent.start_char, + "end": ent.end_char, + "confidence": getattr(ent, "score", None), # If confidence scores are available + } + ) - return { - "text": text, - "entities": entities - } + return {"text": text, "entities": entities} diff --git a/processing/ner/ner_name_tagger.py b/processing/ner/ner_name_tagger.py index 2460e9d..3eeee9c 100644 --- a/processing/ner/ner_name_tagger.py +++ b/processing/ner/ner_name_tagger.py @@ -3,7 +3,9 @@ import logging class NERNameTagger: - def tag_name(self, name: str, probable_native: str, probable_surname: str) -> Union[Dict[str, Any], None]: + def tag_name( + self, name: str, probable_native: str, probable_surname: str + ) -> Union[Dict[str, Any], None]: """Create a single NER training example using probable_native and probable_surname""" if not name or not probable_native or not probable_surname: return None @@ -56,9 +58,10 @@ class NERNameTagger: continue # Check if this is a word boundary match and doesn't overlap - if (self._is_word_boundary_match(name, pos, end_pos) and - not has_overlap(pos, end_pos)): - entities.append((pos, end_pos, 'NATIVE')) + if self._is_word_boundary_match(name, pos, end_pos) and not has_overlap( + pos, end_pos + ): + entities.append((pos, end_pos, "NATIVE")) used_spans.append((pos, end_pos)) break # Only take the first non-overlapping occurrence @@ -84,16 +87,19 @@ class NERNameTagger: start_pos = pos + 1 continue - if (self._is_word_boundary_match(name, pos, end_pos) and - not has_overlap(pos, end_pos)): - entities.append((pos, end_pos, 'SURNAME')) + if self._is_word_boundary_match(name, pos, end_pos) and not has_overlap( + pos, end_pos + ): + entities.append((pos, end_pos, "SURNAME")) used_spans.append((pos, end_pos)) break start_pos = pos + 1 if not entities: - logging.warning(f"No valid entities found for name: '{name}' with native: '{probable_native}' and surname: '{probable_surname}'") + logging.warning( + f"No valid entities found for name: '{name}' with native: '{probable_native}' and surname: '{probable_surname}'" + ) return None # Sort entities by position and validate @@ -104,7 +110,9 @@ class NERNameTagger: for start, end, label in entities: # Check bounds if not (0 <= start < end <= len(name)): - logging.warning(f"Invalid span bounds ({start}, {end}) for text length {len(name)}: '{name}'") + logging.warning( + f"Invalid span bounds ({start}, {end}) for text length {len(name)}: '{name}'" + ) continue # Check for overlaps with already validated entities @@ -114,8 +122,10 @@ class NERNameTagger: # CRITICAL VALIDATION: Check that the span contains only the expected word (no spaces) span_text = name[start:end] - if not span_text or span_text != span_text.strip() or ' ' in span_text: - logging.warning(f"Span contains spaces or is empty ({start}, {end}) in '{name}': '{span_text}'") + if not span_text or span_text != span_text.strip() or " " in span_text: + logging.warning( + f"Span contains spaces or is empty ({start}, {end}) in '{name}': '{span_text}'" + ) continue validated_entities.append((start, end, label)) @@ -129,7 +139,7 @@ class NERNameTagger: return { "entities": entities_str, - "spans": validated_entities # Keep the original tuples for internal use + "spans": validated_entities, # Keep the original tuples for internal use } @classmethod @@ -154,6 +164,7 @@ class NERNameTagger: """Validate that entity annotations are correct for a given name""" try: import ast + entities = ast.literal_eval(entities_str) # Check for overlaps and valid bounds @@ -182,10 +193,11 @@ class NERNameTagger: @classmethod def extract_entity_text(cls, name: str, entities_str: str) -> Dict[str, List[str]]: """Extract the actual text for each entity type""" - result = {'NATIVE': [], 'SURNAME': []} + result = {"NATIVE": [], "SURNAME": []} try: import ast + entities = ast.literal_eval(entities_str) for start, end, label in entities: diff --git a/processing/steps/__init__.py b/processing/steps/__init__.py index b4f960d..9e1a2c9 100644 --- a/processing/steps/__init__.py +++ b/processing/steps/__init__.py @@ -9,6 +9,7 @@ import pandas as pd from pydantic import BaseModel from core.config.pipeline_config import PipelineConfig +from core.utils.data_loader import OPTIMIZED_DTYPES, DataLoader from processing.batch.batch_config import BatchConfig @@ -37,10 +38,11 @@ class PipelineStep(ABC): """Abstract base class for pipeline steps""" def __init__( - self, name: str, pipeline_config: PipelineConfig, batch_config: Optional[BatchConfig] = None + self, name: str, pipeline_config: PipelineConfig, batch_config: Optional[BatchConfig] = None ): self.name = name self.pipeline_config = pipeline_config + self.data_loader = DataLoader(pipeline_config) # Use provided batch_config or create default from pipeline config if batch_config is None: @@ -53,6 +55,11 @@ class PipelineStep(ABC): self.batch_config = batch_config self.state = PipelineState() + @property + def requires_batch_mutation(self) -> bool: + """Indicates if this step modifies the batch data""" + return False + @abstractmethod def process_batch(self, batch: pd.DataFrame, batch_id: int) -> pd.DataFrame: """Process a single batch of data""" @@ -108,12 +115,12 @@ class PipelineStep(ABC): def save_batch(self, batch: pd.DataFrame, batch_id: int): """Save processed batch to checkpoint""" checkpoint_path = self.get_checkpoint_path(batch_id) - batch.to_csv(checkpoint_path, index=False) + self.data_loader.save_csv(batch, checkpoint_path) logging.info(f"Saved batch {batch_id} to {checkpoint_path}") def load_batch(self, batch_id: int) -> Optional[pd.DataFrame]: """Load processed batch from checkpoint""" checkpoint_path = self.get_checkpoint_path(batch_id) if os.path.exists(checkpoint_path): - return pd.read_csv(checkpoint_path) + return self.data_loader.load_csv_complete(checkpoint_path) return None diff --git a/processing/steps/data_splitting_step.py b/processing/steps/data_splitting_step.py index 5aa7bfb..953e72a 100644 --- a/processing/steps/data_splitting_step.py +++ b/processing/steps/data_splitting_step.py @@ -2,11 +2,10 @@ import numpy as np import pandas as pd from core.config.pipeline_config import PipelineConfig -from processing.steps.feature_extraction_step import Gender -from core.utils.data_loader import DataLoader - +from core.utils.region_mapper import RegionMapper from processing.batch.batch_config import BatchConfig from processing.steps import PipelineStep +from processing.steps.feature_extraction_step import Gender class DataSplittingStep(PipelineStep): @@ -20,7 +19,6 @@ class DataSplittingStep(PipelineStep): use_multiprocessing=False, ) super().__init__("data_splitting", pipeline_config, batch_config) - self.data_loader = DataLoader(pipeline_config) self.eval_indices = None def determine_eval_indices(self, total_size: int) -> set: @@ -33,9 +31,9 @@ class DataSplittingStep(PipelineStep): def process_batch(self, batch: pd.DataFrame, batch_id: int) -> pd.DataFrame: """Process batch for data splitting - no modification needed""" - return batch.copy() + return batch - def save_splits(self, df: pd.DataFrame) -> None: + def split(self, df: pd.DataFrame) -> None: """Save the split datasets based on configuration""" output_files = self.pipeline_config.data.output_files data_dir = self.pipeline_config.paths.data_dir @@ -52,9 +50,14 @@ class DataSplittingStep(PipelineStep): else: self.data_loader.save_csv(df, data_dir / output_files["featured"]) + if self.pipeline_config.data.split_by_province: + for province in RegionMapper.get_provinces(): + df_region = df[df.province == province] + self.data_loader.save_csv(df_region, data_dir / "provinces" / f"{province}.csv") + if self.pipeline_config.data.split_by_gender: - df_males = df[df["sex"] == Gender.MALE.value] - df_females = df[df["sex"] == Gender.FEMALE.value] + df_males = df[df.sex == Gender.MALE.value] + df_females = df[df.sex == Gender.FEMALE.value] self.data_loader.save_csv(df_males, data_dir / output_files["males"]) self.data_loader.save_csv(df_females, data_dir / output_files["females"]) diff --git a/processing/steps/feature_extraction_step.py b/processing/steps/feature_extraction_step.py index 3c23f91..dc2dfff 100644 --- a/processing/steps/feature_extraction_step.py +++ b/processing/steps/feature_extraction_step.py @@ -1,5 +1,7 @@ +import gc import logging from enum import Enum +from typing import Dict, Any import pandas as pd @@ -27,10 +29,15 @@ class FeatureExtractionStep(PipelineStep): self.region_mapper = RegionMapper() self.name_tagger = NERNameTagger() + @classmethod + def requires_batch_mutation(cls) -> bool: + """This step creates new columns, so mutation is required""" + return True + @classmethod def validate_gender(cls, gender: str) -> Gender: """Validate and normalize gender value""" - gender_lower = gender.lower().strip() + gender_lower = str(gender).lower().strip() if gender_lower in ["m", "male", "homme", "masculin"]: return Gender.MALE elif gender_lower in ["f", "female", "femme", "féminin"]: @@ -41,68 +48,144 @@ class FeatureExtractionStep(PipelineStep): @classmethod def get_name_category(cls, word_count: int) -> NameCategory: """Determine name category based on word count""" - if word_count == 3: - return NameCategory.SIMPLE - else: - return NameCategory.COMPOSE + return NameCategory.SIMPLE if word_count == 3 else NameCategory.COMPOSE def process_batch(self, batch: pd.DataFrame, batch_id: int) -> pd.DataFrame: """Extract features from names in batch""" logging.info(f"Extracting features for batch {batch_id} with {len(batch)} rows") - batch = batch.copy() + result = batch.copy() + numeric_features = self._compute_numeric_features(result["name"]) + result = result.assign(**numeric_features) - # Basic features - batch["words"] = batch["name"].str.count(" ") + 1 - batch["length"] = batch["name"].str.len() + # Initialize features columns with optimal dtypes + features_columns = self._initialize_features_columns(len(result)) + result = result.assign(**features_columns) - # Handle year column - if "year" in batch.columns: - batch["year"] = pd.to_numeric(batch["year"], errors="coerce").astype("Int64") + self._assign_probable_names(result) + self._process_simple_names(result) + result["identified_category"] = self._assign_identified_category(result["words"]) - # Initialize new columns - batch["probable_native"] = None - batch["probable_surname"] = None - batch["identified_name"] = None - batch["identified_surname"] = None - batch["ner_entities"] = None - batch["ner_tagged"] = 0 - batch["annotated"] = 0 + if "year" in result.columns: + result["year"] = pd.to_numeric(result["year"], errors="coerce").astype("Int16") - # Vectorized category assignment - batch["identified_category"] = batch["words"].apply( - lambda x: self.get_name_category(x).value + if "region" in result.columns: + result["province"] = self.region_mapper.map(result["region"]) + result["province"] = result["province"].astype("category") + + if "sex" in result.columns: + result["sex"] = self._normalize_gender(result["sex"]) + + # Apply final dtype optimizations + result = self._optimize_dtypes(result) + + # Cleanup + del numeric_features, features_columns + if batch_id % 10 == 0: # Periodic cleanup + gc.collect() + + return result + + @classmethod + def _compute_numeric_features(cls, series: pd.Series) -> Dict[str, pd.Series]: + """Calculate basic features in vectorized manner""" + return { + "words": (series.str.count(" ") + 1).astype("Int8"), + "length": series.str.len().astype("Int16"), + } + + @classmethod + def _initialize_features_columns(cls, size: int) -> Dict[str, Any]: + """Initialize new columns with optimal dtypes""" + return { + "probable_native": pd.Series([None] * size, dtype="string"), + "probable_surname": pd.Series([None] * size, dtype="string"), + "identified_name": pd.Series([None] * size, dtype="string"), + "identified_surname": pd.Series([None] * size, dtype="string"), + "ner_entities": pd.Series([None] * size, dtype="string"), + "ner_tagged": pd.Series([0] * size, dtype="Int8"), + "annotated": pd.Series([0] * size, dtype="Int8"), + } + + @classmethod + def _assign_probable_names(cls, df: pd.DataFrame) -> None: + """Assign probable native and surname names efficiently""" + + name_splits = df["name"].str.split() + mask = name_splits.str.len() >= 2 + + df.loc[mask, "probable_native"] = name_splits[mask].apply( + lambda x: " ".join(x[:-1]) if isinstance(x, list) else None + ) + df.loc[mask, "probable_surname"] = name_splits[mask].apply( + lambda x: x[-1] if isinstance(x, list) else None ) - # Assign probable_native and probable_surname for all names - name_splits = batch["name"].str.split() - batch["probable_native"] = name_splits.apply( - lambda x: " ".join(x[:-1]) if isinstance(x, list) and len(x) >= 2 else None - ) - batch["probable_surname"] = name_splits.apply( - lambda x: x[-1] if isinstance(x, list) and len(x) >= 2 else None - ) + def _assign_identified_category(self, series: pd.Series) -> pd.Series: + """Assign identified category based on word count""" + return series.map(lambda x: self.get_name_category(x).value).astype("category") - # Auto-assign for 3-word names - three_word_mask = batch["words"] == 3 - batch.loc[three_word_mask, "identified_name"] = batch.loc[three_word_mask, "probable_native"] - batch.loc[three_word_mask, "identified_surname"] = batch.loc[three_word_mask, "probable_surname"] - batch.loc[three_word_mask, "annotated"] = 1 + def _process_simple_names(self, df: pd.DataFrame) -> None: + """Process 3-word names efficiently with vectorized operations""" + mask = df["words"] == 3 - # Tag names with NER entities - three_word_rows = batch[three_word_mask] + if not mask.any(): + return + + df.loc[mask, "identified_name"] = df.loc[mask, "probable_native"] + df.loc[mask, "identified_surname"] = df.loc[mask, "probable_surname"] + df.loc[mask, "annotated"] = 1 + + # NER tagging for 3-word names + three_word_rows = df[mask] for idx, row in three_word_rows.iterrows(): - entity = self.name_tagger.tag_name(row['name'], row['identified_name'], row['identified_surname']) + try: + entity = self.name_tagger.tag_name( + row["name"], row["identified_name"], row["identified_surname"] + ) - if entity: - batch.at[idx, "ner_entities"] = entity["entities"] - batch.at[idx, "ner_tagged"] = 1 + if entity: + df.at[idx, "ner_entities"] = str(entity["entities"]) + df.at[idx, "ner_tagged"] = 1 + except Exception as e: + logging.warning(f"NER tagging failed for row {idx}: {e}") - # Map regions to provinces - batch["province"] = self.region_mapper.map_regions_vectorized(batch["region"]) + def _normalize_gender(self, series: pd.Series) -> pd.Series: + gender_mapping = { + "m": "m", + "male": "m", + "homme": "m", + "masculin": "m", + "f": "f", + "female": "f", + "femme": "f", + "féminin": "f", + } - # Normalize gender - if "sex" in batch.columns: - batch["sex"] = batch["sex"].apply(lambda x: self.validate_gender(str(x)).value) + # Apply mapping with error handling + normalized = series.astype(str).str.lower().str.strip().map(gender_mapping) + return normalized.astype("category") - return batch + @classmethod + def _optimize_dtypes(cls, df: pd.DataFrame) -> pd.DataFrame: + categories = ["province", "identified_category", "sex"] + + for col in categories: + if col in df.columns and df[col].dtype != "category": + df[col] = df[col].astype("category") + + # Ensure string columns are proper string dtype + string_cols = [ + "name", + "probable_native", + "probable_surname", + "identified_name", + "identified_surname", + "ner_entities", + ] + + for col in string_cols: + if col in df.columns and df[col].dtype == "object": + df[col] = df[col].astype("string") + + return df diff --git a/processing/steps/llm_annotation_step.py b/processing/steps/llm_annotation_step.py index 8aa80fb..357df1f 100644 --- a/processing/steps/llm_annotation_step.py +++ b/processing/steps/llm_annotation_step.py @@ -24,8 +24,7 @@ class LLMAnnotationStep(PipelineStep): batch_config = BatchConfig( batch_size=pipeline_config.processing.batch_size, max_workers=min( - self.llm_config.max_concurrent_requests, - pipeline_config.processing.max_workers + self.llm_config.max_concurrent_requests, pipeline_config.processing.max_workers ), checkpoint_interval=pipeline_config.processing.checkpoint_interval, use_multiprocessing=pipeline_config.processing.use_multiprocessing, @@ -98,7 +97,7 @@ class LLMAnnotationStep(PipelineStep): # Exponential backoff with jitter if attempt < self.llm_config.retry_attempts - 1: - wait_time = (2 ** attempt) + (time.time() % 1) + wait_time = (2**attempt) + (time.time() % 1) time.sleep(min(wait_time, 10)) self.failed_requests += 1 @@ -156,6 +155,8 @@ class LLMAnnotationStep(PipelineStep): batch.loc[idx, "annotated"] = 0 # Ensure proper data types - batch["annotated"] = pd.to_numeric(batch["annotated"], errors="coerce").fillna(0).astype("Int8") + batch["annotated"] = ( + pd.to_numeric(batch["annotated"], errors="coerce").fillna(0).astype("Int8") + ) return batch diff --git a/processing/steps/ner_annotation_step.py b/processing/steps/ner_annotation_step.py index 1280ec5..410d806 100644 --- a/processing/steps/ner_annotation_step.py +++ b/processing/steps/ner_annotation_step.py @@ -6,8 +6,8 @@ from typing import Dict import pandas as pd from core.config.pipeline_config import PipelineConfig -from processing.steps import PipelineStep, NameAnnotation from processing.ner.ner_name_model import NERNameModel +from processing.steps import PipelineStep, NameAnnotation class NERAnnotationStep(PipelineStep): @@ -63,7 +63,7 @@ class NERAnnotationStep(PipelineStep): # Get NER predictions prediction = self.ner_trainer.predict(name.lower()) - entities = prediction.get('entities', []) + entities = prediction.get("entities", []) elapsed_time = time.time() - start_time @@ -72,15 +72,15 @@ class NERAnnotationStep(PipelineStep): surname_parts = [] for entity in entities: - if entity['label'] == 'NATIVE': - native_parts.append(entity['text']) - elif entity['label'] == 'SURNAME': - surname_parts.append(entity['text']) + if entity["label"] == "NATIVE": + native_parts.append(entity["text"]) + elif entity["label"] == "SURNAME": + surname_parts.append(entity["text"]) # Create annotation result in same format as LLM step annotation = NameAnnotation( identified_name=" ".join(native_parts) if native_parts else None, - identified_surname=" ".join(surname_parts) if surname_parts else None + identified_surname=" ".join(surname_parts) if surname_parts else None, ) result = { @@ -159,6 +159,8 @@ class NERAnnotationStep(PipelineStep): batch.loc[idx, "annotated"] = 0 # Ensure proper data types - batch["annotated"] = pd.to_numeric(batch["annotated"], errors="coerce").fillna(0).astype("Int8") + batch["annotated"] = ( + pd.to_numeric(batch["annotated"], errors="coerce").fillna(0).astype("Int8") + ) return batch diff --git a/research/experiment/experiment_runner.py b/research/experiment/experiment_runner.py index d4592e0..d997443 100644 --- a/research/experiment/experiment_runner.py +++ b/research/experiment/experiment_runner.py @@ -224,9 +224,9 @@ class ExperimentRunner: model.learning_curve_data = model_data.get("learning_curve_data", {}) # Restore vectorizers and encoders for models that use them (like XGBoost) - if "vectorizers" in model_data and hasattr(model, 'vectorizers'): + if "vectorizers" in model_data and hasattr(model, "vectorizers"): model.vectorizers = model_data["vectorizers"] - if "label_encoders" in model_data and hasattr(model, 'label_encoders'): + if "label_encoders" in model_data and hasattr(model, "label_encoders"): model.label_encoders = model_data["label_encoders"] return model @@ -237,7 +237,9 @@ class ExperimentRunner: return None - def compare_experiments(self, experiment_ids: List[str], metric: str = "accuracy") -> pd.DataFrame: + def compare_experiments( + self, experiment_ids: List[str], metric: str = "accuracy" + ) -> pd.DataFrame: """Compare experiments and return analysis""" comparison_df = self.tracker.compare_experiments(experiment_ids) diff --git a/research/model_trainer.py b/research/model_trainer.py index 5e10089..b27f702 100644 --- a/research/model_trainer.py +++ b/research/model_trainer.py @@ -28,13 +28,13 @@ class ModelTrainer: self.models_dir.mkdir(parents=True, exist_ok=True) def train_single_model( - self, - model_name: str, - model_type: str = "logistic_regression", - features: List[str] = None, - model_params: Dict[str, Any] = None, - tags: List[str] = None, - save_artifacts: bool = True, + self, + model_name: str, + model_type: str = "logistic_regression", + features: List[str] = None, + model_params: Dict[str, Any] = None, + tags: List[str] = None, + save_artifacts: bool = True, ) -> str: """ Train a single model and save its artifacts. @@ -76,10 +76,7 @@ class ModelTrainer: return experiment_id def train_multiple_models( - self, - base_name: str, - model_configs: List[Dict[str, Any]], - save_all: bool = True + self, base_name: str, model_configs: List[Dict[str, Any]], save_all: bool = True ) -> List[str]: """ Train multiple models with different configurations. diff --git a/research/models/lightgbm_model.py b/research/models/lightgbm_model.py index 2a4256a..c0fa692 100644 --- a/research/models/lightgbm_model.py +++ b/research/models/lightgbm_model.py @@ -50,14 +50,18 @@ class LightGBMModel(TraditionalModel): self.vectorizers[feature_key] = CountVectorizer( analyzer="char", ngram_range=(2, 3), max_features=50 ) - char_features = self.vectorizers[feature_key].fit_transform( - column.fillna("").astype(str) - ).toarray() + char_features = ( + self.vectorizers[feature_key] + .fit_transform(column.fillna("").astype(str)) + .toarray() + ) else: # Subsequent times - use existing vectorizer - char_features = self.vectorizers[feature_key].transform( - column.fillna("").astype(str) - ).toarray() + char_features = ( + self.vectorizers[feature_key] + .transform(column.fillna("").astype(str)) + .toarray() + ) features.append(char_features) else: diff --git a/research/models/logistic_regression_model.py b/research/models/logistic_regression_model.py index 4bc146c..9a280b6 100644 --- a/research/models/logistic_regression_model.py +++ b/research/models/logistic_regression_model.py @@ -20,9 +20,7 @@ class LogisticRegressionModel(TraditionalModel): ) classifier = LogisticRegression( - max_iter=params.get("max_iter", 1000), - random_state=self.config.random_seed, - verbose=2 + max_iter=params.get("max_iter", 1000), random_state=self.config.random_seed, verbose=2 ) return Pipeline([("vectorizer", vectorizer), ("classifier", classifier)]) diff --git a/research/models/random_forest_model.py b/research/models/random_forest_model.py index a2d9aac..4c274ff 100644 --- a/research/models/random_forest_model.py +++ b/research/models/random_forest_model.py @@ -18,7 +18,7 @@ class RandomForestModel(TraditionalModel): n_estimators=params.get("n_estimators", 100), max_depth=params.get("max_depth", None), random_state=self.config.random_seed, - verbose=2 + verbose=2, ) def prepare_features(self, X: pd.DataFrame) -> np.ndarray: diff --git a/research/models/svm_model.py b/research/models/svm_model.py index 03ef3cc..e90ec21 100644 --- a/research/models/svm_model.py +++ b/research/models/svm_model.py @@ -25,7 +25,7 @@ class SVMModel(TraditionalModel): gamma=params.get("gamma", "scale"), probability=True, # Enable probability prediction random_state=self.config.random_seed, - verbose=2 + verbose=2, ) return Pipeline([("vectorizer", vectorizer), ("classifier", classifier)]) diff --git a/research/models/xgboost_model.py b/research/models/xgboost_model.py index 454d807..07e4be0 100644 --- a/research/models/xgboost_model.py +++ b/research/models/xgboost_model.py @@ -28,7 +28,7 @@ class XGBoostModel(TraditionalModel): colsample_bytree=params.get("colsample_bytree", 0.8), random_state=self.config.random_seed, eval_metric="logloss", - verbosity=2 + verbosity=2, ) def prepare_features(self, X: pd.DataFrame) -> np.ndarray: @@ -50,14 +50,18 @@ class XGBoostModel(TraditionalModel): self.vectorizers[feature_key] = CountVectorizer( analyzer="char", ngram_range=(2, 3), max_features=100 ) - char_features = self.vectorizers[feature_key].fit_transform( - column.fillna("").astype(str) - ).toarray() + char_features = ( + self.vectorizers[feature_key] + .fit_transform(column.fillna("").astype(str)) + .toarray() + ) else: # Subsequent times - use existing vectorizer - char_features = self.vectorizers[feature_key].transform( - column.fillna("").astype(str) - ).toarray() + char_features = ( + self.vectorizers[feature_key] + .transform(column.fillna("").astype(str)) + .toarray() + ) features.append(char_features) else: diff --git a/research/neural_network_model.py b/research/neural_network_model.py index f5372cc..6cbe13b 100644 --- a/research/neural_network_model.py +++ b/research/neural_network_model.py @@ -59,7 +59,9 @@ class NeuralNetworkModel(BaseModel): ) # Train the neural network - logging.info(f"Fitting model with {X_prepared.shape[0]} samples and {X_prepared.shape[1]} features") + logging.info( + f"Fitting model with {X_prepared.shape[0]} samples and {X_prepared.shape[1]} features" + ) history = self.model.fit( X_prepared, y_encoded, @@ -162,7 +164,11 @@ class NeuralNetworkModel(BaseModel): # Split data once for validation X_train_full, X_val, y_train_full, y_val = train_test_split( - X_prepared, y_encoded, test_size=0.2, random_state=self.config.random_seed, stratify=y_encoded + X_prepared, + y_encoded, + test_size=0.2, + random_state=self.config.random_seed, + stratify=y_encoded, ) for size in train_sizes: diff --git a/research/traditional_model.py b/research/traditional_model.py index 46511a2..099c35a 100644 --- a/research/traditional_model.py +++ b/research/traditional_model.py @@ -55,7 +55,9 @@ class TraditionalModel(BaseModel): logging.info(f"Fitting model with {X_prepared.shape[0]} samples (text features)") else: # For numerical features - logging.info(f"Fitting model with {X_prepared.shape[0]} samples and {X_prepared.shape[1]} features") + logging.info( + f"Fitting model with {X_prepared.shape[0]} samples and {X_prepared.shape[1]} features" + ) self.model.fit(X_prepared, y_encoded) self.is_fitted = True diff --git a/train.py b/train.py index f796b35..a7a9184 100755 --- a/train.py +++ b/train.py @@ -3,8 +3,8 @@ import argparse import logging import sys import traceback + import yaml -from pathlib import Path from core.config import setup_config from research.model_trainer import ModelTrainer @@ -13,7 +13,7 @@ from research.model_trainer import ModelTrainer def load_research_templates(templates_path: str = "config/research_templates.yaml") -> dict: """Load research templates from YAML file""" try: - with open(templates_path, 'r') as file: + with open(templates_path, "r") as file: return yaml.safe_load(file) except FileNotFoundError: logging.error(f"Templates file not found: {templates_path}") @@ -30,13 +30,15 @@ def find_experiment_config(templates: dict, name: str, experiment_type: str) -> "baseline": "baseline_experiments", "advanced": "advanced_experiments", "feature_study": "feature_studies", - "tuning": "hyperparameter_tuning" + "tuning": "hyperparameter_tuning", } section_name = type_mapping.get(experiment_type) if not section_name: available_types = list(type_mapping.keys()) - raise ValueError(f"Unknown experiment type '{experiment_type}'. Available types: {available_types}") + raise ValueError( + f"Unknown experiment type '{experiment_type}'. Available types: {available_types}" + ) if section_name not in templates: raise ValueError(f"Section '{section_name}' not found in templates") @@ -47,16 +49,22 @@ def find_experiment_config(templates: dict, name: str, experiment_type: str) -> for experiment in experiments: # Check if this is the experiment we're looking for # Look for experiments that match the model type or contain the name - if (experiment.get("model_type") == name or - name.lower() in experiment.get("name", "").lower() or - f"baseline_{name}" == experiment.get("name") or - f"advanced_{name}" == experiment.get("name")): + if ( + experiment.get("model_type") == name + or name.lower() in experiment.get("name", "").lower() + or f"baseline_{name}" == experiment.get("name") + or f"advanced_{name}" == experiment.get("name") + ): return experiment # If not found, list available experiments - available_experiments = [exp.get("name", exp.get("model_type", "unknown")) for exp in experiments] - raise ValueError(f"Experiment '{name}' not found in '{experiment_type}' section. " - f"Available experiments: {available_experiments}") + available_experiments = [ + exp.get("name", exp.get("model_type", "unknown")) for exp in experiments + ] + raise ValueError( + f"Experiment '{name}' not found in '{experiment_type}' section. " + f"Available experiments: {available_experiments}" + ) def main(): @@ -91,7 +99,7 @@ def main(): model_type=experiment_config.get("model_type"), features=experiment_config.get("features"), model_params=experiment_config.get("model_params", {}), - tags=experiment_config.get("tags", []) + tags=experiment_config.get("tags", []), ) logging.info("Training completed successfully!")