Files
drc-ners-nlp/monitor.py
T

90 lines
3.2 KiB
Python
Executable File

#!.venv/bin/python3
import argparse
import sys
import traceback
from pathlib import Path
from core.config import setup_config
from processing.monitoring.pipeline_monitor import PipelineMonitor
def main():
choices = ["data_cleaning", "feature_extraction", "ner_annotation", "llm_annotation", "data_splitting"]
parser = argparse.ArgumentParser(description="Monitor and manage the DRC names processing pipeline")
parser.add_argument("--config", type=Path, help="Path to configuration file")
parser.add_argument("--env", type=str, default="development", help="Environment")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Status command
subparsers.add_parser("status", help="Show pipeline status")
# Clean command
clean_parser = subparsers.add_parser("clean", help="Clean checkpoint files")
clean_parser.add_argument("--step", type=str, choices=choices, help="Specific step (default: all)")
clean_parser.add_argument("--keep-last", type=int, default=1, help="Checkpoints to keep (default: 1)")
clean_parser.add_argument("--force", action="store_true", help="Clean without confirmation")
# Reset command
reset_parser = subparsers.add_parser("reset", help="Reset pipeline step")
reset_parser.add_argument("--step", type=str, choices=choices, help="Specific step (default: all)")
reset_parser.add_argument("--all", action="store_true", help="Reset all steps")
reset_parser.add_argument("--force", action="store_true", help="Reset without confirmation")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
try:
setup_config(config_path=args.config, env=args.env)
monitor = PipelineMonitor()
if args.command == "status":
monitor.print_status(detailed=True)
elif args.command == "clean":
checkpoint_info = monitor.count_checkpoint_files()
print(f"Current checkpoint storage: {checkpoint_info['total_size_mb']:.1f} MB")
if not args.force:
response = input("Are you sure you want to clean checkpoints? (y/N): ")
if response.lower() != "y":
print("Cancelled")
return 0
if args.step:
monitor.clean_step_checkpoints(args.step, args.keep_last)
else:
for step in monitor.steps:
monitor.clean_step_checkpoints(step, args.keep_last)
print("Checkpoint cleaning completed")
elif args.command == "reset":
if not args.force:
response = input(
f"Are you sure you want to reset {args.step}? This will delete all checkpoints. (y/N): "
)
if response.lower() != "y":
print("Cancelled")
return 0
if args.step:
monitor.reset_step(args.step)
else:
for step in monitor.steps:
monitor.reset_step(step)
print(f"Reset completed")
except Exception as e:
print(f"Monitoring failed: {e}")
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)