feat: implement NER dataset feature engineering with multiple transformation formats

This commit is contained in:
2025-08-12 00:11:46 +02:00
parent d5a4aaaf4a
commit 3977d5c313
9 changed files with 439 additions and 0 deletions
+72
View File
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
NER Dataset Feature Engineering Script
Processes the names_featured.csv dataset to create position-independent variations
"""
import argparse
import os
from processing.ner.ner_engineering import NEREngineering
def main():
parser = argparse.ArgumentParser(description='Engineer NER dataset for position-independent learning')
parser.add_argument('--input', default='data/dataset/names_featured.csv', help='Input CSV file path')
parser.add_argument('--output', default='data/dataset/names_featured_engineered.csv', help='Output CSV file path')
parser.add_argument('--seed', type=int, default=42, help='Random seed for reproducibility')
args = parser.parse_args()
print("=== NER Dataset Feature Engineering ===")
print(f"Input file: {args.input}")
print(f"Output file: {args.output}")
print(f"Random seed: {args.seed}")
# Check if input file exists
if not os.path.exists(args.input):
print(f"Error: Input file {args.input} not found!")
return
# Initialize engineering class
engineering = NEREngineering()
try:
# Load data with progress indication
print("\n1. Loading NER-tagged data...")
data = engineering.load_ner_data(args.input)
print(f" Dataset size: {len(data):,} rows")
# Show sample of original data
print("\n2. Sample original data:")
for i, row in data.head(3).iterrows():
print(f" {row['name']} -> Native: '{row['probable_native']}', Surname: '{row['probable_surname']}'")
# Apply transformations
print("\n3. Applying feature engineering transformations...")
engineered_data = engineering.engineer_dataset(data, random_seed=args.seed)
# Save results
print(f"\n4. Saving engineered dataset to {args.output}...")
engineering.save_engineered_dataset(engineered_data, args.output)
# Show statistics
print(f"\n=== RESULTS SUMMARY ===")
print(f"Original dataset: {len(data):,} rows")
print(f"Engineered dataset: {len(engineered_data):,} rows")
print(f"Transformation distribution:")
counts = engineered_data['transformation_type'].value_counts().sort_index()
for trans_type, count in counts.items():
percentage = (count / len(engineered_data)) * 100
print(f" {trans_type}: {count:,} rows ({percentage:.1f}%)")
print(f"\nDataset successfully engineered and saved!")
except Exception as e:
print(f"Error during processing: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
+77
View File
@@ -0,0 +1,77 @@
from abc import ABC, abstractmethod
from typing import List, Tuple, Dict
import pandas as pd
from processing.steps.feature_extraction_step import NameCategory
class BaseNameFormatter(ABC):
"""
Base class for name formatting transformations.
Contains common logic for NER tagging and attribute computation.
"""
def __init__(self, connectors: List[str] = None, additional_surnames: List[str] = None):
self.connectors = connectors or ['wa', 'ya', 'ka', 'ba']
self.additional_surnames = additional_surnames or [
'jean', 'paul', 'marie', 'joseph', 'pierre', 'claude',
'andre', 'michel', 'robert'
]
@classmethod
def parse_native_components(cls, native_str: str) -> List[str]:
"""Parse native name string into individual components"""
if pd.isna(native_str) or not native_str:
return []
return native_str.strip().split()
def create_ner_tags(self, text: str, native_parts: List[str], surname: str) -> List[Tuple[int, int, str]]:
"""Create NER entity tags for transformed text"""
entities = []
current_pos = 0
words = text.split()
for word in words:
start_pos = current_pos
end_pos = current_pos + len(word)
# Determine tag based on word content
if word in native_parts or any(connector in word for connector in self.connectors):
tag = 'NATIVE'
elif word == surname or word in self.additional_surnames:
tag = 'SURNAME'
else:
# Check if it's a compound native word or new surname
if any(part in word for part in native_parts):
tag = 'NATIVE'
else:
tag = 'SURNAME'
entities.append((start_pos, end_pos, tag))
current_pos = end_pos + 1 # +1 for space
return entities
@classmethod
def compute_derived_attributes(cls, name: str) -> Dict:
"""Compute all derived attributes for the transformed name"""
words_count = len(name.split()) if name else 0
length = len(name) if name else 0
return {
'words': words_count,
'length': length,
'identified_category': NameCategory.SIMPLE if words_count == 3 else NameCategory.COMPOSE,
}
@abstractmethod
def transform(self, row: pd.Series) -> Dict:
"""Transform a row according to the specific format rules"""
pass
@property
@abstractmethod
def transformation_type(self) -> str:
"""Return the transformation type identifier"""
pass
@@ -0,0 +1,35 @@
import random
from typing import Dict
import pandas as pd
from processing.ner.formats import BaseNameFormatter
class ConnectorFormatter(BaseNameFormatter):
def transform(self, row: pd.Series) -> Dict:
native_parts = self.parse_native_components(row['probable_native'])
surname = row['probable_surname'] if pd.notna(row['probable_surname']) else ''
connector = random.choice(self.connectors)
if len(native_parts) > 1:
connected_native = f" {connector} ".join(native_parts)
full_name = f"{connected_native} {surname}".strip()
else:
connected_native = f"{row['probable_native']} {connector} {row['probable_native']}".strip()
full_name = f"{connected_native} {surname}".strip()
return {
'name': full_name,
'probable_native': connected_native,
'identify_name': connected_native,
'probable_surname': surname,
'identify_surname': surname,
'ner_entities': str(self.create_ner_tags(full_name, native_parts, surname)),
'transformation_type': self.transformation_type,
**self.compute_derived_attributes(full_name)
}
@property
def transformation_type(self) -> str:
return 'connector_added'
@@ -0,0 +1,32 @@
import random
from typing import Dict
import pandas as pd
from processing.ner.formats import BaseNameFormatter
class ExtendedSurnameFormatter(BaseNameFormatter):
def transform(self, row: pd.Series) -> Dict:
native_parts = self.parse_native_components(row['probable_native'])
original_surname = row['probable_surname'] if pd.notna(row['probable_surname']) else ''
# Add random additional surname
additional_surname = random.choice(self.additional_surnames)
combined_surname = f"{additional_surname} {original_surname}".strip()
full_name = f"{row['probable_native']} {combined_surname}".strip()
return {
'name': full_name,
'probable_native': row['probable_native'],
'identify_name': row['probable_native'],
'probable_surname': combined_surname,
'identity_surname': combined_surname,
'ner_entities': str(self.create_ner_tags(full_name, native_parts, combined_surname)),
'transformation_type': self.transformation_type,
**self.compute_derived_attributes(full_name)
}
@property
def transformation_type(self) -> str:
return 'extended_surname'
@@ -0,0 +1,28 @@
from typing import Dict
import pandas as pd
from processing.ner.formats import BaseNameFormatter
class NativeOnlyFormatter(BaseNameFormatter):
def transform(self, row: pd.Series) -> Dict:
native_parts = self.parse_native_components(row['probable_native'])
# Only native components
full_name = row['probable_native']
return {
'name': full_name,
'probable_native': row['probable_native'],
'identify_name': row['probable_native'],
'probable_surname': '',
'identify_surname': '',
'ner_entities': str(self.create_ner_tags(full_name, native_parts, '')),
'transformation_type': self.transformation_type,
**self.compute_derived_attributes(full_name)
}
@property
def transformation_type(self) -> str:
return 'native_only'
+29
View File
@@ -0,0 +1,29 @@
from typing import Dict
import pandas as pd
from processing.ner.formats import BaseNameFormatter
class OriginalFormatter(BaseNameFormatter):
def transform(self, row: pd.Series) -> Dict:
native_parts = self.parse_native_components(row['probable_native'])
surname = row['probable_surname'] if pd.notna(row['probable_surname']) else ''
# Keep original order: native components + surname
full_name = f"{row['probable_native']} {surname}".strip()
return {
'name': full_name,
'probable_native': row['probable_native'],
'identify_name': row['probable_native'],
'probable_surname': surname,
'identify_surname': surname,
'ner_entities': str(self.create_ner_tags(full_name, native_parts, surname)),
'transformation_type': self.transformation_type,
**self.compute_derived_attributes(full_name)
}
@property
def transformation_type(self) -> str:
return 'original'
@@ -0,0 +1,29 @@
from typing import Dict
import pandas as pd
from processing.ner.formats import BaseNameFormatter
class PositionFlippedFormatter(BaseNameFormatter):
def transform(self, row: pd.Series) -> Dict:
native_parts = self.parse_native_components(row['probable_native'])
surname = row['probable_surname'] if pd.notna(row['probable_surname']) else ''
# Flip order: surname + native components
full_name = f"{surname} {row['probable_native']}".strip()
return {
'name': full_name,
'probable_native': row['probable_native'],
'identify_name': row['probable_native'],
'probable_surname': surname,
'identify_surname': surname,
'ner_entities': str(self.create_ner_tags(full_name, native_parts, surname)),
'transformation_type': self.transformation_type,
**self.compute_derived_attributes(full_name)
}
@property
def transformation_type(self) -> str:
return 'position_flipped'
@@ -0,0 +1,30 @@
from typing import Dict
import pandas as pd
from processing.ner.formats import BaseNameFormatter
class ReducedNativeFormatter(BaseNameFormatter):
def transform(self, row: pd.Series) -> Dict:
native_parts = self.parse_native_components(row['probable_native'])
surname = row['probable_surname'] if pd.notna(row['probable_surname']) else ''
# Keep only first native component + surname
reduced_native = native_parts[0] if len(native_parts) > 1 else row['probable_native']
full_name = f"{reduced_native} {surname}".strip()
return {
'name': full_name,
'probable_native': reduced_native,
'identify_name': reduced_native,
'probable_surname': surname,
'identify_surname': surname,
'ner_entities': str(self.create_ner_tags(full_name, [reduced_native], surname)),
'transformation_type': self.transformation_type,
**self.compute_derived_attributes(full_name)
}
@property
def transformation_type(self) -> str:
return 'reduced_native'
+107
View File
@@ -0,0 +1,107 @@
import random
from typing import List
import numpy as np
import pandas as pd
from processing.ner.formats.connectors_format import ConnectorFormatter
from processing.ner.formats.extended_surname_format import ExtendedSurnameFormatter
from processing.ner.formats.native_only_format import NativeOnlyFormatter
from processing.ner.formats.original_format import OriginalFormatter
from processing.ner.formats.position_flipped_format import PositionFlippedFormatter
from processing.ner.formats.reduced_native_format import ReducedNativeFormatter
class NEREngineering:
"""
Feature engineering for NER dataset to prevent position-based learning
and encourage sequence characteristic learning.
"""
def __init__(self, connectors: List[str] = None, additional_surnames: List[str] = None):
self.connectors = connectors or ['wa', 'ya', 'ka', 'ba', 'la']
self.additional_surnames = additional_surnames or [
'jean', 'paul', 'marie', 'joseph', 'pierre', 'claude',
'andre', 'michel', 'robert'
]
# Initialize format classes
self.formatters = {
'original': OriginalFormatter(self.connectors, self.additional_surnames),
'native_only': NativeOnlyFormatter(self.connectors, self.additional_surnames),
'position_flipped': PositionFlippedFormatter(self.connectors, self.additional_surnames),
'reduced_native': ReducedNativeFormatter(self.connectors, self.additional_surnames),
'connector_added': ConnectorFormatter(self.connectors, self.additional_surnames),
'extended_surname': ExtendedSurnameFormatter(self.connectors, self.additional_surnames)
}
@classmethod
def load_ner_data(cls, filepath: str) -> pd.DataFrame:
"""Load and filter NER-tagged data from CSV file"""
df = pd.read_csv(filepath)
# Filter only NER-tagged rows
ner_data = df[df['ner_tagged'] == 1].copy()
print(f"Loaded {len(ner_data)} NER-tagged records from {len(df)} total records")
return ner_data
def engineer_dataset(self, df: pd.DataFrame, random_seed: int = 42) -> pd.DataFrame:
"""
Apply feature engineering transformations according to the specified rules:
- First 25%: original format
- Second 25%: remove surname
- Third 25%: flip positions
- Fourth 10%: reduce native components
- Fifth 10%: add connectors
- Last 5%: extend surnames
"""
random.seed(random_seed)
np.random.seed(random_seed)
# Shuffle the dataset
df_shuffled = df.sample(frac=1, random_state=random_seed).reset_index(drop=True)
total_rows = len(df_shuffled)
# Calculate split points
split_25_1 = int(total_rows * 0.25)
split_25_2 = int(total_rows * 0.50)
split_25_3 = int(total_rows * 0.75)
split_10_1 = int(total_rows * 0.85)
split_10_2 = int(total_rows * 0.95)
# Define transformation groups
transformation_groups = [
(0, split_25_1, 'original'),
(split_25_1, split_25_2, 'native_only'),
(split_25_2, split_25_3, 'position_flipped'),
(split_25_3, split_10_1, 'reduced_native'),
(split_10_1, split_10_2, 'connector_added'),
(split_10_2, total_rows, 'extended_surname')
]
print("Dataset splits:")
for start, end, trans_type in transformation_groups:
print(f"Group {trans_type}: {start} to {end} ({end - start} rows)")
# Process each group
engineered_rows = []
for start, end, formatter_key in transformation_groups:
formatter = self.formatters[formatter_key]
for idx in range(start, end):
row = df_shuffled.iloc[idx]
transformed = formatter.transform(row)
# Keep original columns and add transformed ones
new_row = row.to_dict()
new_row.update(transformed)
engineered_rows.append(new_row)
return pd.DataFrame(engineered_rows)
@classmethod
def save_engineered_dataset(cls, df: pd.DataFrame, output_path: str):
"""Save the engineered dataset to CSV file"""
df.to_csv(output_path, index=False)
print(f"Engineered dataset saved to {output_path}")