diff --git a/app.py b/app.py index 0112522..5535e8a 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,7 @@ #!.venv/bin/python3 import streamlit as st -from core.config import get_config +from core.config import setup_config_and_logging from core.utils.data_loader import DataLoader from interface.configuration import Configuration from interface.dashboard import Dashboard @@ -25,8 +25,8 @@ st.set_page_config( @st.cache_data def load_config(): - """Load application configuration""" - return get_config() + """Load application configuration with unified setup""" + return setup_config_and_logging(env="development") class StreamlitApp: diff --git a/cli.py b/cli.py index 3363c9f..a01c0c7 100755 --- a/cli.py +++ b/cli.py @@ -6,7 +6,7 @@ from pathlib import Path import pandas as pd -from core.config import get_config, setup_logging +from core.config import setup_config_and_logging from research.experiment.experiment_runner import ExperimentRunner from research.experiment.experiment_tracker import ExperimentTracker @@ -104,7 +104,8 @@ def show_experiment_details(args): def compare_experiments_cmd(args): """Compare multiple experiments""" - runner = ExperimentRunner(get_config()) + config = setup_config_and_logging(env="development") + runner = ExperimentRunner(config) comparison = runner.compare_experiments(args.experiment_ids) if comparison.empty: @@ -130,14 +131,20 @@ def export_results(args): def main(): - """Main CLI entry point""" + """Main CLI entry point with unified configuration loading""" parser = argparse.ArgumentParser( description="DRC Names Research Experiment Manager", formatter_class=argparse.RawDescriptionHelpFormatter, ) - # Setup logging + # Global arguments + parser.add_argument("--config", type=Path, help="Path to configuration file") + parser.add_argument( + "--env", type=str, default="development", + help="Environment name (default: development)" + ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") + subparsers = parser.add_subparsers(dest="command", help="Available commands") # List experiments @@ -163,14 +170,15 @@ def main(): parser.print_help() return 1 - # Setup logging - config = get_config() - if args.verbose: - config.logging.level = "DEBUG" - setup_logging(config) - - # Execute command try: + # Load configuration and setup logging + config = setup_config_and_logging(config_path=args.config, env=args.env) + + # Override log level if verbose requested + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Execute command command_map = { "list": list_experiments, "show": show_experiment_details, diff --git a/config/pipeline.development.yaml b/config/pipeline.development.yaml index 6b3ae60..110ca54 100644 --- a/config/pipeline.development.yaml +++ b/config/pipeline.development.yaml @@ -31,12 +31,14 @@ llm: max_concurrent_requests: 4 enable_rate_limiting: true -# Production data settings +# Development data settings - limited dataset for faster testing data: split_evaluation: true split_by_gender: true evaluation_fraction: 0.2 random_seed: 42 + max_dataset_size: 10_000 # Limit to 10k records for development/testing + balance_by_sex: true # Balance male/female samples when limiting # Enhanced logging for development logging: diff --git a/config/pipeline.production.yaml b/config/pipeline.production.yaml index 8431054..447b0d7 100644 --- a/config/pipeline.production.yaml +++ b/config/pipeline.production.yaml @@ -37,6 +37,8 @@ data: split_by_gender: true evaluation_fraction: 0.2 random_seed: 42 + max_dataset_size: null + balance_by_sex: false # Production logging (less verbose) logging: diff --git a/config/pipeline.yaml b/config/pipeline.yaml index b1b1802..93565d8 100644 --- a/config/pipeline.yaml +++ b/config/pipeline.yaml @@ -58,6 +58,8 @@ data: split_by_gender: true evaluation_fraction: 0.2 random_seed: 42 + max_dataset_size: null + balance_by_sex: false # Logging configuration logging: diff --git a/core/config/__init__.py b/core/config/__init__.py index 72634a6..9c26acb 100644 --- a/core/config/__init__.py +++ b/core/config/__init__.py @@ -21,6 +21,41 @@ def load_config(config_path: Optional[Union[str, Path]] = None) -> PipelineConfi return config_manager.get_config() +def setup_config_and_logging( + config_path: Optional[Path] = None, + env: str = "development" +) -> PipelineConfig: + """ + Unified configuration loading and logging setup for all entrypoint scripts. + + Args: + config_path: Direct path to config file (takes precedence over env) + env: Environment name (defaults to "development") + + Returns: + Loaded configuration object + """ + # Determine config path + if config_path is None: + config_path = Path("config") / f"pipeline.{env}.yaml" + + # Load configuration + config = ConfigManager(config_path).load_config() + + # Setup logging + setup_logging(config) + + # Ensure required directories exist + from core.utils import ensure_directories + ensure_directories(config) + + logging.info(f"Loaded configuration: {config.name} v{config.version}") + logging.info(f"Environment: {config.environment}") + logging.info(f"Config file: {config_path}") + + return config + + def setup_logging(config: PipelineConfig): """Setup logging based on configuration""" diff --git a/core/config/data_config.py b/core/config/data_config.py index d48756c..639f68f 100644 --- a/core/config/data_config.py +++ b/core/config/data_config.py @@ -1,5 +1,5 @@ from dataclasses import field -from typing import Dict +from typing import Dict, Optional from pydantic import BaseModel @@ -20,3 +20,7 @@ class DataConfig(BaseModel): split_by_gender: bool = True evaluation_fraction: float = 0.2 random_seed: int = 42 + + # Dataset size limiting options + max_dataset_size: Optional[int] = None + balance_by_sex: bool = False diff --git a/core/utils/data_loader.py b/core/utils/data_loader.py index aaf6e7c..da3eb59 100644 --- a/core/utils/data_loader.py +++ b/core/utils/data_loader.py @@ -44,9 +44,71 @@ class DataLoader: raise ValueError(f"Unable to decode {filepath} with any encoding: {encodings}") def load_csv_complete(self, filepath: Union[str, Path]) -> pd.DataFrame: - """Load complete CSV file into memory""" + """Load complete CSV file into memory with size limiting and balancing""" chunks = list(self.load_csv_chunked(filepath)) - return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame() + if not chunks: + return pd.DataFrame() + + df = pd.concat(chunks, ignore_index=True) + + # Apply dataset size limiting if configured + if self.config.data.max_dataset_size is not None: + df = self._limit_dataset_size(df) + + return df + + def _limit_dataset_size(self, df: pd.DataFrame) -> pd.DataFrame: + """Limit dataset size with optional sex balancing""" + max_size = self.config.data.max_dataset_size + + if max_size is None or len(df) <= max_size: + return df + + if self.config.data.balance_by_sex and "sex" in df.columns: + return self._balanced_sample(df, max_size) + else: + # Simple random sampling + return df.sample(n=max_size, random_state=self.config.data.random_seed) + + def _balanced_sample(self, df: pd.DataFrame, max_size: int) -> pd.DataFrame: + """Sample data with balanced sex distribution""" + + # Get unique sex values + sex_values = df["sex"].dropna().unique() + + if len(sex_values) == 0: + logging.warning(f"No valid values found in sex column 'sex', using random sampling") + return df.sample(n=max_size, random_state=self.config.data.random_seed) + + # Calculate samples per sex category + samples_per_sex = max_size // len(sex_values) + remaining_samples = max_size % len(sex_values) + + balanced_samples = [] + + for i, sex in enumerate(sex_values): + sex_df = df[df["sex"] == sex] + + # Distribute remaining samples to first categories + current_samples = samples_per_sex + (1 if i < remaining_samples else 0) + current_samples = min(current_samples, len(sex_df)) + + if current_samples > 0: + sample = sex_df.sample(n=current_samples, random_state=self.config.data.random_seed + i) + balanced_samples.append(sample) + logging.info(f"Sampled {current_samples} records for sex '{sex}'") + + if not balanced_samples: + logging.warning("No balanced samples could be created, using random sampling") + return df.sample(n=max_size, random_state=self.config.data.random_seed) + + result = pd.concat(balanced_samples, ignore_index=True) + + # Shuffle the final result + result = result.sample(frac=1, random_state=self.config.data.random_seed).reset_index(drop=True) + + logging.info(f"Created balanced dataset with {len(result)} records from {len(df)} total records") + return result @classmethod def save_csv( diff --git a/main.py b/main.py index e03aa43..e9c58c6 100755 --- a/main.py +++ b/main.py @@ -3,11 +3,10 @@ import sys import argparse import logging from pathlib import Path -from typing import Optional from core.utils.data_loader import DataLoader -from core.config import ConfigManager, setup_logging -from core.utils import ensure_directories, get_data_file_path +from core.config import setup_config_and_logging +from core.utils import get_data_file_path from processing.pipeline import Pipeline from processing.batch.batch_config import BatchConfig @@ -17,13 +16,8 @@ from processing.steps.feature_extraction_step import FeatureExtractionStep from processing.steps.data_cleaning_step import DataCleaningStep -def create_pipeline_from_config(config_path: Optional[Path] = None) -> Pipeline: - """Create pipeline from configuration file""" - config = ConfigManager(config_path).load_config() - - # Setup logging - setup_logging(config) - ensure_directories(config) +def create_pipeline_from_config(config) -> Pipeline: + """Create pipeline from configuration""" batch_config = BatchConfig( batch_size=config.processing.batch_size, max_workers=config.processing.max_workers, @@ -48,13 +42,10 @@ def create_pipeline_from_config(config_path: Optional[Path] = None) -> Pipeline: return pipeline -def run_pipeline(config_path: Optional[Path] = None, resume: bool = False) -> int: +def run_pipeline(config, resume: bool = False) -> int: """Run the complete pipeline""" try: - config = ConfigManager(config_path).load_config() - logging.info(f"Starting pipeline: {config.name} v{config.version}") - logging.info(f"Environment: {config.environment}") # Load input data input_file_path = get_data_file_path(config.data.input_file, config) @@ -69,7 +60,7 @@ def run_pipeline(config_path: Optional[Path] = None, resume: bool = False) -> in logging.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") # Create and run pipeline - pipeline = create_pipeline_from_config(config_path) + pipeline = create_pipeline_from_config(config) logging.info("Starting pipeline execution") result_df = pipeline.run(df) @@ -99,27 +90,28 @@ def run_pipeline(config_path: Optional[Path] = None, resume: bool = False) -> in def main(): - """Main entry point with minimal command-line interface""" + """Main entry point with unified configuration loading""" parser = argparse.ArgumentParser( description="DRC Names Processing Pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Configuration File Examples: config/pipeline.yaml - Main configuration - config/pipeline.development.yaml - Development environment + config/pipeline.development.yaml - Development environment (default) config/pipeline.production.yaml - Production environment Usage Examples: - python processing/main.py # Use default config - python processing/main.py --config config/pipeline.yaml # Use specific config - python processing/main.py --env development # Use environment config - python processing/main.py --resume # Resume from checkpoints + python main.py # Use development config (default) + python main.py --config config/pipeline.yaml # Use specific config + python main.py --env production # Use production environment + python main.py --resume # Resume from checkpoints """, ) parser.add_argument("--config", type=Path, help="Path to configuration file") parser.add_argument( - "--env", type=str, help="Environment name (loads config/pipeline.{env}.yaml)" + "--env", type=str, default="development", + help="Environment name (default: development)" ) parser.add_argument( "--resume", action="store_true", help="Resume pipeline from existing checkpoints" @@ -129,24 +121,20 @@ Usage Examples: ) args = parser.parse_args() - # Determine config path - config_path = None - if args.config: - config_path = args.config - elif args.env: - config_path = Path("config") / f"pipeline.{args.env}.yaml" + try: + # Load configuration and setup logging + config = setup_config_and_logging(config_path=args.config, env=args.env) - if args.validate_config: - try: - config = ConfigManager(config_path).load_config() + if args.validate_config: print(f"Configuration is valid: {config.name} v{config.version}") return 0 - except Exception as e: - print(f"Configuration validation failed: {e}") - return 1 - # Run pipeline - return run_pipeline(config_path, args.resume) + # Run pipeline + return run_pipeline(config, args.resume) + + except Exception as e: + print(f"Configuration or pipeline failed: {e}") + return 1 if __name__ == "__main__": diff --git a/monitor.py b/monitor.py index fe29e83..6c53d43 100755 --- a/monitor.py +++ b/monitor.py @@ -1,8 +1,9 @@ #!.venv/bin/python3 import argparse import sys +from pathlib import Path -from core.config.config_manager import ConfigManager +from core.config import setup_config_and_logging from processing.monitoring.data_analyzer import DatasetAnalyzer from processing.monitoring.pipeline_monitor import PipelineMonitor @@ -11,6 +12,12 @@ def main(): parser = argparse.ArgumentParser( description="Monitor and manage the DRC names processing pipeline" ) + parser.add_argument("--config", type=Path, help="Path to configuration file") + parser.add_argument( + "--env", type=str, default="development", + help="Environment name (default: development)" + ) + subparsers = parser.add_subparsers(dest="command", help="Available commands") # Status command @@ -62,80 +69,88 @@ def main(): parser.print_help() return 1 - monitor = PipelineMonitor() + try: + # Load configuration and setup logging + config = setup_config_and_logging(config_path=args.config, env=args.env) - if args.command == "status": - monitor.print_status(detailed=args.detailed) + monitor = PipelineMonitor() - elif args.command == "clean": - checkpoint_info = monitor.count_checkpoint_files() - print(f"Current checkpoint storage: {checkpoint_info['total_size_mb']:.1f} MB") + if args.command == "status": + monitor.print_status(detailed=args.detailed) - if not args.force: - response = input("Are you sure you want to clean checkpoints? (y/N): ") - if response.lower() != "y": - print("Cancelled") - return 0 + elif args.command == "clean": + checkpoint_info = monitor.count_checkpoint_files() + print(f"Current checkpoint storage: {checkpoint_info['total_size_mb']:.1f} MB") - if args.step: - monitor.clean_step_checkpoints(args.step, args.keep_last) - else: - for step in monitor.steps: - monitor.clean_step_checkpoints(step, args.keep_last) + if not args.force: + response = input("Are you sure you want to clean checkpoints? (y/N): ") + if response.lower() != "y": + print("Cancelled") + return 0 - print("Checkpoint cleaning completed") + if args.step: + monitor.clean_step_checkpoints(args.step, args.keep_last) + else: + for step in monitor.steps: + monitor.clean_step_checkpoints(step, args.keep_last) - elif args.command == "reset": - if not args.force: - response = input( - f"Are you sure you want to reset {args.step}? This will delete all checkpoints. (y/N): " + print("Checkpoint cleaning completed") + + elif args.command == "reset": + if not args.force: + response = input( + f"Are you sure you want to reset {args.step}? This will delete all checkpoints. (y/N): " + ) + if response.lower() != "y": + print("Cancelled") + return 0 + + monitor.reset_step(args.step) + print(f"Reset completed for {args.step}") + + elif args.command == "analyze": + # Use configured data directory + data_dir = config.paths.data_dir + filepath = data_dir / args.file + + if not filepath.exists(): + print(f"File not found: {filepath}") + return 1 + + analyzer = DatasetAnalyzer(str(filepath)) + + if not analyzer.load_data(): + return 1 + + completion_stats = analyzer.analyze_completion() + + print(f"\n=== Dataset Analysis: {args.file} ===") + print(f"Total rows: {completion_stats['total_rows']:,}") + print(f"Annotated: {completion_stats['annotated_rows']:,} ({completion_stats['annotation_percentage']:.1f}%)") + print(f"Unannotated: {completion_stats['unannotated_rows']:,}") + print( + f"Complete names: {completion_stats['complete_names']:,} ({completion_stats['completeness_percentage']:.1f}%)" ) - if response.lower() != "y": - print("Cancelled") - return 0 - monitor.reset_step(args.step) - print(f"Reset completed for {args.step}") + elif args.command == "info": + checkpoint_info = monitor.count_checkpoint_files() - elif args.command == "analyze": - # Use configured data directory instead of hardcoded DATA_DIR - data_dir = ConfigManager().default_paths.data_dir - filepath = data_dir / args.file - - if not filepath.exists(): - print(f"File not found: {filepath}") - return 1 - - analyzer = DatasetAnalyzer(str(filepath)) - - if not analyzer.load_data(): - return 1 - - completion_stats = analyzer.analyze_completion() - - print(f"\n=== Dataset Analysis: {args.file} ===") - print(f"Total rows: {completion_stats['total_rows']:,}") - print(f"Annotated: {completion_stats['annotated_rows']:,} ({completion_stats['annotation_percentage']:.1f}%)") - print(f"Unannotated: {completion_stats['unannotated_rows']:,}") - print( - f"Complete names: {completion_stats['complete_names']:,} ({completion_stats['completeness_percentage']:.1f}%)" - ) - - elif args.command == "info": - checkpoint_info = monitor.count_checkpoint_files() - - print(f"\n=== Checkpoint Information ===") - print(f"Total storage: {checkpoint_info['total_size_mb']:.1f} MB") - print() - - for step in monitor.steps: - step_info = checkpoint_info[step] - print(f"{step.replace('_', ' ').title()}:") - print(f" Files: {step_info['files']}") - print(f" Size: {step_info['size_mb']:.1f} MB") + print(f"\n=== Checkpoint Information ===") + print(f"Total storage: {checkpoint_info['total_size_mb']:.1f} MB") print() - return 0 + for step in monitor.steps: + step_info = checkpoint_info[step] + print(f"{step.replace('_', ' ').title()}:") + print(f" Files: {step_info['files']}") + print(f" Size: {step_info['size_mb']:.1f} MB") + print() + + return 0 + + except Exception as e: + print(f"Monitor command failed: {e}") + return 1 if __name__ == "__main__": diff --git a/train.py b/train.py index cb4407e..7a451db 100755 --- a/train.py +++ b/train.py @@ -1,26 +1,43 @@ #!.venv/bin/python3 import argparse +import sys -from core.config import setup_logging, get_config +from core.config import setup_config_and_logging from research.model_trainer import ModelTrainer def main(): - setup_logging(get_config()) parser = argparse.ArgumentParser(description="Train DRC Names Models") + parser.add_argument("--config", type=str, help="Path to configuration file") + parser.add_argument( + "--env", type=str, default="development", + help="Environment name (default: development)" + ) parser.add_argument("--type", type=str, help="Specific model type to train") parser.add_argument("--name", type=str, help="Model name") args = parser.parse_args() - trainer = ModelTrainer() - # Train specific model - trainer.train_single_model( - model_name=args.name, - model_type=args.type, - features=["full_name"] - ) + try: + # Load configuration and setup logging + config = setup_config_and_logging(config_path=args.config, env=args.env) + + trainer = ModelTrainer() + + # Train specific model + trainer.train_single_model( + model_name=args.name, + model_type=args.type, + features=["full_name"] + ) + + return 0 + + except Exception as e: + print(f"Training failed: {e}") + return 1 if __name__ == "__main__": - main() + exit_code = main() + sys.exit(exit_code)