Files
drc-ners-nlp/interface/log_reader.py
T

186 lines
5.8 KiB
Python

import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
@dataclass
class LogEntry:
"""Represents a single log entry."""
timestamp: datetime
logger: str
level: str
message: str
raw_line: str
class LogReader:
"""Utility class for reading and parsing log files."""
def __init__(self, log_file_path: Path):
"""Initialize the log reader with a log file path."""
self.log_file_path = Path(log_file_path)
# Pattern to match Python logging format: timestamp - logger - level - message
self.log_pattern = re.compile(
r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - (.+?) - (\w+) - (.+)'
)
def read_last_entries(self, count: int = 10) -> List[LogEntry]:
"""Read the last N entries from the log file."""
if not self.log_file_path.exists():
return []
try:
with open(self.log_file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# Parse log entries from the end
entries = []
for line in reversed(lines[-count * 2:]): # Read more lines in case some don't match
entry = self._parse_log_line(line.strip())
if entry:
entries.append(entry)
if len(entries) >= count:
break
# Return entries in chronological order (oldest first of the last N)
return list(reversed(entries))
except Exception as e:
print(f"Error reading log file: {e}")
return []
def read_entries_by_level(self, level: str, count: int = 50) -> List[LogEntry]:
"""Read entries filtered by log level."""
if not self.log_file_path.exists():
return []
try:
with open(self.log_file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
entries = []
for line in reversed(lines):
entry = self._parse_log_line(line.strip())
if entry and entry.level.upper() == level.upper():
entries.append(entry)
if len(entries) >= count:
break
return list(reversed(entries))
except Exception as e:
print(f"Error reading log file: {e}")
return []
def read_entries_since(self, since: datetime, count: int = 100) -> List[LogEntry]:
"""Read entries since a specific datetime."""
if not self.log_file_path.exists():
return []
try:
with open(self.log_file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
entries = []
for line in reversed(lines):
entry = self._parse_log_line(line.strip())
if entry:
if entry.timestamp >= since:
entries.append(entry)
else:
# Stop reading if we've gone past the since time
break
if len(entries) >= count:
break
return list(reversed(entries))
except Exception as e:
print(f"Error reading log file: {e}")
return []
def get_log_stats(self) -> Dict[str, int]:
"""Get statistics about the log file."""
if not self.log_file_path.exists():
return {}
try:
with open(self.log_file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
stats = {
'total_lines': len(lines),
'INFO': 0,
'WARNING': 0,
'ERROR': 0,
'DEBUG': 0,
'CRITICAL': 0
}
for line in lines:
entry = self._parse_log_line(line.strip())
if entry:
level = entry.level.upper()
if level in stats:
stats[level] += 1
return stats
except Exception as e:
print(f"Error reading log file: {e}")
return {}
def _parse_log_line(self, line: str) -> Optional[LogEntry]:
"""Parse a single log line into a LogEntry object."""
if not line:
return None
match = self.log_pattern.match(line)
if not match:
return None
try:
timestamp_str, logger, level, message = match.groups()
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f')
return LogEntry(
timestamp=timestamp,
logger=logger,
level=level,
message=message,
raw_line=line
)
except ValueError:
return None
class MultiLogReader:
"""Reader for multiple log files."""
def __init__(self, log_directory: Path):
"""Initialize with a directory containing log files."""
self.log_directory = Path(log_directory)
def get_available_log_files(self) -> List[Path]:
"""Get list of available log files."""
if not self.log_directory.exists():
return []
return list(self.log_directory.glob('*.log'))
def read_from_all_files(self, count: int = 10) -> List[LogEntry]:
"""Read entries from all log files and merge them chronologically."""
all_entries = []
for log_file in self.get_available_log_files():
reader = LogReader(log_file)
entries = reader.read_last_entries(count)
all_entries.extend(entries)
# Sort by timestamp
all_entries.sort(key=lambda x: x.timestamp, reverse=True)
return all_entries[:count]