add age and gender detector

This commit is contained in:
samiabat 2025-07-28 18:11:45 +03:00
parent 972c98bce5
commit fdfab8972c
5 changed files with 1072 additions and 0 deletions

150
api.py
View File

@ -30,6 +30,24 @@ import config as global_config
import logging
import subprocess
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
import asyncio
import json
import time
import logging
from datetime import datetime
from typing import Dict
from face_detector import AgeGenderDetector
from utils import decode_base64_image
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import nltk
nltk.download('averaged_perceptron_tagger_eng')
@ -871,6 +889,45 @@ change_gpt_sovits_weights(gpt_path = gpt_path, sovits_path = sovits_path)
# 接口部分
# --------------------------------
app = FastAPI()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global detector instance
detector = AgeGenderDetector()
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, session_id: str):
await websocket.accept()
self.active_connections[session_id] = websocket
logger.info(f"🔌 Connected: {session_id}")
def disconnect(self, session_id: str):
self.active_connections.pop(session_id, None)
logger.info(f"🔌 Disconnected: {session_id}")
async def send_message(self, session_id: str, message: dict):
websocket = self.active_connections.get(session_id)
if websocket:
try:
await websocket.send_text(json.dumps(message))
except:
self.disconnect(session_id)
manager = ConnectionManager()
@app.post("/")
async def tts_endpoint(request: Request):
@ -1101,6 +1158,99 @@ async def tts_endpoint(
print(f"the base path is {refer_wav_path}")
return handle(refer_wav_path, prompt_text, prompt_language, text, text_language, cut_punc, top_k, top_p, temperature, speed, inp_refs, sample_steps, if_sr)
@app.post("/analyze_image")
async def analyze_image(file: UploadFile = File(...)):
"""Analyze uploaded image"""
try:
# Read image
image_data = await file.read()
# Convert to OpenCV format
nparr = np.frombuffer(image_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
raise HTTPException(status_code=400, detail="Invalid image format")
# Process image
start_time = time.time()
results = detector.process_image(image)
processing_time = time.time() - start_time
# Cleanup periodically
if len(detector.face_results) > 50:
detector.cleanup_old_results()
return {
"success": True,
"processing_time": round(processing_time, 2),
"people": results,
"total_people": len(results),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Image analysis error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
"""WebSocket endpoint for real-time processing"""
await manager.connect(websocket, session_id)
def result_callback(person_id: str, result: dict):
"""Callback for when analysis is complete"""
asyncio.create_task(manager.send_message(session_id, {
"type": "analysis_complete",
"person_id": person_id,
"result": result,
"timestamp": datetime.now().isoformat()
}))
try:
while True:
# Receive data
data = await websocket.receive_text()
message = json.loads(data)
if message.get("type") == "frame":
# Process frame
base64_image = message.get("image")
if base64_image:
image = decode_base64_image(base64_image)
if image is not None:
results = detector.process_image(image, callback=result_callback)
# Send immediate response
await manager.send_message(session_id, {
"type": "frame_processed",
"people": results,
"total_people": len(results),
"timestamp": datetime.now().isoformat()
})
elif message.get("type") == "ping":
await manager.send_message(session_id, {
"type": "pong",
"timestamp": datetime.now().isoformat()
})
except WebSocketDisconnect:
manager.disconnect(session_id)
except Exception as e:
logger.error(f"WebSocket error: {e}")
manager.disconnect(session_id)
@app.get("/stats")
async def get_stats():
"""Get system statistics"""
return {
"active_connections": len(manager.active_connections),
"known_persons": len(detector.face_encodings),
"cached_results": len(detector.face_results),
"analysis_queue_size": detector.analysis_queue.qsize(),
"system_time": datetime.now().isoformat()
}
if __name__ == "__main__":
logging.info("the server is running")

528
detectors.py Normal file
View File

@ -0,0 +1,528 @@
"""
detectors.py - Face Detection Algorithms and Validators
Contains different face detection methods and quality validation
"""
import cv2
import numpy as np
import logging
from typing import List, Tuple
logger = logging.getLogger(__name__)
class EnhancedFaceDetector:
"""Enhanced face detector using multiple detection methods"""
def __init__(self):
self.detectors = {}
self.load_detectors()
def load_detectors(self):
"""Load multiple face detection algorithms"""
# 1. MediaPipe Face Detection (primary)
try:
import mediapipe as mp
self.mp_face_detection = mp.solutions.face_detection
self.mp_face_detector = self.mp_face_detection.FaceDetection(
model_selection=1,
min_detection_confidence=0.5 # Relaxed threshold
)
self.detectors['mediapipe'] = True
logger.info("✅ MediaPipe Face Detector loaded")
except ImportError:
logger.warning("⚠️ MediaPipe not available - install with: pip install mediapipe")
self.detectors['mediapipe'] = False
# 2. Haar Cascade (fallback)
try:
self.face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
)
self.detectors['haar'] = True
logger.info("✅ Haar cascade loaded as fallback")
except Exception as e:
logger.error(f"❌ Haar cascade loading error: {e}")
self.detectors['haar'] = False
def detect_faces_mediapipe(self, image):
"""Detect faces using MediaPipe (most accurate)"""
if not self.detectors.get('mediapipe', False):
return []
try:
# Convert BGR to RGB
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self.mp_face_detector.process(rgb_image)
faces = []
if results.detections:
h, w = image.shape[:2]
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
confidence = detection.score[0]
# Convert relative coordinates to absolute
x = max(0, int(bbox.xmin * w))
y = max(0, int(bbox.ymin * h))
width = int(bbox.width * w)
height = int(bbox.height * h)
# Ensure face is within image bounds
x = min(x, w - 1)
y = min(y, h - 1)
width = min(width, w - x)
height = min(height, h - y)
if width > 30 and height > 30: # Minimum size
faces.append((x, y, width, height, confidence))
return faces
except Exception as e:
logger.error(f"MediaPipe detection error: {e}")
return []
def detect_faces_haar(self, image):
"""Detect faces using Haar cascades (fallback)"""
if not self.detectors.get('haar', False):
return []
try:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=3, # Relaxed from 5
minSize=(30, 30), # Relaxed from (80, 80)
maxSize=(500, 500)
)
# Add estimated confidence
faces_with_conf = []
for (x, y, w, h) in faces:
# Estimate confidence based on face size
confidence = min(0.9, max(0.5, (w * h) / (100 * 100)))
faces_with_conf.append((x, y, w, h, confidence))
return faces_with_conf
except Exception as e:
logger.error(f"Haar detection error: {e}")
return []
def detect_faces_dnn(self, image):
"""Detect faces using OpenCV DNN (optional third method)"""
try:
# This requires pre-trained DNN model files
# For now, we'll skip this implementation
# You can add DNN detection here if you have the model files
return []
except Exception as e:
logger.error(f"DNN detection error: {e}")
return []
def detect_faces(self, image):
"""Main face detection using best available method"""
all_faces = []
# Try MediaPipe first (most accurate)
faces = self.detect_faces_mediapipe(image)
if faces:
all_faces.extend([(x, y, w, h, conf, 'mediapipe') for x, y, w, h, conf in faces])
# If no MediaPipe faces or low confidence, try Haar
if not all_faces or max([f[4] for f in all_faces]) < 0.7:
faces = self.detect_faces_haar(image)
all_faces.extend([(x, y, w, h, conf, 'haar') for x, y, w, h, conf in faces])
# Filter and remove duplicates
filtered_faces = self.filter_and_deduplicate_faces(all_faces)
return filtered_faces
def filter_and_deduplicate_faces(self, faces):
"""Remove duplicate faces and apply quality filters"""
if not faces:
return []
# Sort by confidence (highest first)
faces.sort(key=lambda x: x[4], reverse=True)
filtered = []
for face in faces:
x, y, w, h, conf, method = face
# Apply quality checks
if not self.is_valid_face_detection(x, y, w, h, conf):
continue
# Check for overlap with existing faces
is_duplicate = False
for existing in filtered:
if self.faces_overlap(face, existing):
is_duplicate = True
break
if not is_duplicate:
filtered.append(face)
return filtered
def is_valid_face_detection(self, x, y, w, h, confidence):
"""Validate face detection quality"""
# Minimum confidence threshold (relaxed)
if confidence < 0.3:
return False
# Minimum size check (relaxed)
if w < 30 or h < 30:
return False
# Aspect ratio check (more permissive)
aspect_ratio = w / h
if aspect_ratio < 0.4 or aspect_ratio > 2.5:
return False
# Coordinates should be positive
if x < 0 or y < 0:
return False
return True
def faces_overlap(self, face1, face2, threshold=0.5):
"""Check if two face detections overlap significantly"""
x1, y1, w1, h1 = face1[:4]
x2, y2, w2, h2 = face2[:4]
# Calculate intersection area
xi1 = max(x1, x2)
yi1 = max(y1, y2)
xi2 = min(x1 + w1, x2 + w2)
yi2 = min(y1 + h1, y2 + h2)
if xi2 <= xi1 or yi2 <= yi1:
return False
intersection = (xi2 - xi1) * (yi2 - yi1)
# Calculate union area
area1 = w1 * h1
area2 = w2 * h2
union = area1 + area2 - intersection
# Calculate IoU (Intersection over Union)
iou = intersection / union if union > 0 else 0
return iou > threshold
class FaceQualityValidator:
"""Validate face quality before analysis"""
@staticmethod
def is_face_clear(face_img, blur_threshold=30):
"""Check if face is clear enough (not too blurry)"""
try:
gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var > blur_threshold
except:
return True # If check fails, assume it's OK
@staticmethod
def has_sufficient_size(face_img, min_size=30):
"""Check if face has sufficient resolution"""
h, w = face_img.shape[:2]
return min(h, w) >= min_size
@staticmethod
def is_properly_aligned(face_img):
"""Basic check for face alignment (relaxed)"""
try:
h, w = face_img.shape[:2]
# More permissive aspect ratio
aspect_ratio = w / h
return 0.3 <= aspect_ratio <= 3.0
except:
return True # If check fails, assume it's OK
@staticmethod
def has_good_contrast(face_img, min_std=20):
"""Check if face has sufficient contrast"""
try:
gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
std_dev = np.std(gray)
return std_dev > min_std
except:
return True # If check fails, assume it's OK
@staticmethod
def is_well_lit(face_img, min_brightness=30, max_brightness=220):
"""Check if face is well lit (not too dark or overexposed)"""
try:
gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
mean_brightness = np.mean(gray)
return min_brightness < mean_brightness < max_brightness
except:
return True # If check fails, assume it's OK
@staticmethod
def validate_face(face_img):
"""Complete face validation with relaxed criteria"""
if face_img is None or face_img.size == 0:
return False, "Empty face image"
if not FaceQualityValidator.has_sufficient_size(face_img):
return False, "Face too small"
# Skip strict checks for now - they were too restrictive
# if not FaceQualityValidator.is_face_clear(face_img):
# return False, "Face too blurry"
if not FaceQualityValidator.is_properly_aligned(face_img):
return False, "Face poorly aligned"
# Optional additional checks (commented out for relaxed validation)
# if not FaceQualityValidator.has_good_contrast(face_img):
# return False, "Face has poor contrast"
# if not FaceQualityValidator.is_well_lit(face_img):
# return False, "Face is poorly lit"
return True, "Face valid"
@staticmethod
def get_face_quality_score(face_img):
"""Get overall quality score for face (0-100)"""
if face_img is None or face_img.size == 0:
return 0
score = 0
# Size score (0-25 points)
h, w = face_img.shape[:2]
min_dim = min(h, w)
if min_dim >= 100:
score += 25
elif min_dim >= 60:
score += 20
elif min_dim >= 30:
score += 15
else:
score += 5
# Clarity score (0-25 points)
try:
gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
if laplacian_var > 100:
score += 25
elif laplacian_var > 50:
score += 20
elif laplacian_var > 20:
score += 15
else:
score += 10
except:
score += 15 # Default moderate score
# Alignment score (0-25 points)
try:
aspect_ratio = w / h
if 0.8 <= aspect_ratio <= 1.2:
score += 25 # Perfect square-ish
elif 0.6 <= aspect_ratio <= 1.6:
score += 20 # Good
elif 0.4 <= aspect_ratio <= 2.0:
score += 15 # Acceptable
else:
score += 5 # Poor
except:
score += 15 # Default moderate score
# Contrast score (0-25 points)
try:
gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
std_dev = np.std(gray)
if std_dev > 60:
score += 25
elif std_dev > 40:
score += 20
elif std_dev > 20:
score += 15
else:
score += 5
except:
score += 15 # Default moderate score
return min(100, score)
@staticmethod
def enhance_face_for_analysis(face_img):
"""Apply basic enhancement to improve face for analysis"""
try:
# Convert to LAB color space
lab = cv2.cvtColor(face_img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
# Apply CLAHE to L channel (brightness)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4, 4))
l = clahe.apply(l)
# Merge back
enhanced = cv2.merge([l, a, b])
enhanced = cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR)
# Apply slight Gaussian blur to reduce noise
enhanced = cv2.GaussianBlur(enhanced, (3, 3), 0)
return enhanced
except Exception as e:
logger.error(f"Face enhancement error: {e}")
return face_img
class FaceTracker:
"""Track faces across frames for better stability"""
def __init__(self, max_distance=50, max_age=30):
self.tracks = {}
self.next_id = 1
self.max_distance = max_distance
self.max_age = max_age
def update(self, detections):
"""Update tracks with new detections"""
# Calculate distances between existing tracks and new detections
matched_tracks = {}
unmatched_detections = list(detections)
for track_id, track in self.tracks.items():
if track['age'] > self.max_age:
continue # Skip old tracks
best_match = None
best_distance = float('inf')
for i, detection in enumerate(unmatched_detections):
x, y, w, h = detection[:4]
center_x, center_y = x + w//2, y + h//2
track_x, track_y = track['center']
distance = np.sqrt((center_x - track_x)**2 + (center_y - track_y)**2)
if distance < self.max_distance and distance < best_distance:
best_distance = distance
best_match = i
if best_match is not None:
detection = unmatched_detections[best_match]
x, y, w, h = detection[:4]
# Update track
self.tracks[track_id].update({
'center': (x + w//2, y + h//2),
'bbox': (x, y, w, h),
'age': 0,
'confidence': detection[4] if len(detection) > 4 else 0.5
})
matched_tracks[track_id] = detection
unmatched_detections.pop(best_match)
# Age existing tracks
for track_id in list(self.tracks.keys()):
if track_id not in matched_tracks:
self.tracks[track_id]['age'] += 1
if self.tracks[track_id]['age'] > self.max_age:
del self.tracks[track_id]
# Create new tracks for unmatched detections
for detection in unmatched_detections:
x, y, w, h = detection[:4]
self.tracks[self.next_id] = {
'center': (x + w//2, y + h//2),
'bbox': (x, y, w, h),
'age': 0,
'confidence': detection[4] if len(detection) > 4 else 0.5,
'created_frame': self.next_id
}
matched_tracks[self.next_id] = detection
self.next_id += 1
return matched_tracks
def get_stable_faces(self, min_track_length=3):
"""Get faces that have been tracked for a minimum number of frames"""
stable_tracks = {}
for track_id, track in self.tracks.items():
if track['age'] < min_track_length:
stable_tracks[track_id] = track
return stable_tracks
class MultiScaleDetector:
"""Detect faces at multiple scales for better accuracy"""
def __init__(self, base_detector):
self.base_detector = base_detector
self.scales = [1.0, 0.8, 1.2] # Different scales to try
def detect_faces(self, image):
"""Detect faces at multiple scales"""
all_detections = []
h, w = image.shape[:2]
for scale in self.scales:
if scale != 1.0:
# Resize image
new_w = int(w * scale)
new_h = int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
else:
resized = image
# Detect faces
faces = self.base_detector.detect_faces(resized)
# Scale coordinates back to original size
for face in faces:
x, y, w_f, h_f, conf, method = face
if scale != 1.0:
x = int(x / scale)
y = int(y / scale)
w_f = int(w_f / scale)
h_f = int(h_f / scale)
all_detections.append((x, y, w_f, h_f, conf, f"{method}_scale_{scale}"))
# Remove duplicates and return best detections
return self.base_detector.filter_and_deduplicate_faces(all_detections)
# Factory function to create detector with best available methods
def create_face_detector(use_tracking=False, use_multiscale=False):
"""
Factory function to create the best available face detector
Args:
use_tracking: Enable face tracking across frames
use_multiscale: Enable multi-scale detection
Returns:
Configured face detector
"""
base_detector = EnhancedFaceDetector()
if use_multiscale:
detector = MultiScaleDetector(base_detector)
else:
detector = base_detector
if use_tracking:
# Note: Tracking would need to be integrated into the main detection loop
logger.info("Face tracking enabled")
return detector

344
face_detector.py Normal file
View File

@ -0,0 +1,344 @@
"""
face_detector.py - Core Face Detection and Analysis
Exact same logic as your working code, just modularized
"""
import cv2
import numpy as np
from PIL import Image
import torch
import time
import threading
from queue import Queue
import logging
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
class AgeGenderDetector:
"""Enhanced Age & Gender Detection System - EXACT SAME LOGIC AS YOUR WORKING CODE"""
def __init__(self):
self.face_results = {}
self.face_encodings = {}
self.person_counter = 0
self.analysis_queue = Queue()
self.running = True
# Load models
self.load_models()
# Start analysis worker
self.analysis_thread = threading.Thread(target=self.analysis_worker, daemon=True)
self.analysis_thread.start()
logger.info("✅ AgeGenderDetector initialized")
def load_models(self):
"""Load AI models - EXACT SAME AS YOUR WORKING CODE"""
try:
# Load DeepFace
from deepface import DeepFace
self.deepface = DeepFace
logger.info("✅ DeepFace loaded")
except ImportError:
logger.error("❌ DeepFace not available")
self.deepface = None
try:
# Load HuggingFace age model
from transformers import AutoImageProcessor, SiglipForImageClassification
model_name = "prithivMLmods/facial-age-detection"
self.age_model = SiglipForImageClassification.from_pretrained(model_name)
self.age_processor = AutoImageProcessor.from_pretrained(model_name)
logger.info("✅ HuggingFace age model loaded")
except Exception as e:
logger.error(f"❌ HuggingFace model error: {e}")
self.age_model = None
self.age_processor = None
# Age labels
self.id2label = {
"0": "01-10", "1": "11-20", "2": "21-30", "3": "31-40",
"4": "41-55", "5": "56-65", "6": "66-80", "7": "80+"
}
# Face detector
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def analysis_worker(self):
"""Background analysis worker - EXACT SAME AS YOUR WORKING CODE"""
while self.running:
try:
if not self.analysis_queue.empty():
task = self.analysis_queue.get(timeout=0.1)
if task is None:
break
person_id = task['id']
face_img = task['image']
callback = task.get('callback')
# Analyze
age, age_conf = self.analyze_age(face_img)
gender, gender_conf = self.analyze_gender(face_img)
# Store results
current_time = time.time()
if person_id in self.face_results:
first_seen = self.face_results[person_id].get('first_seen', current_time)
else:
first_seen = current_time
result = {
'age': age,
'age_conf': age_conf,
'gender': gender,
'gender_conf': gender_conf,
'timestamp': current_time,
'first_seen': first_seen
}
self.face_results[person_id] = result
# Call callback if provided
if callback:
callback(person_id, result)
else:
time.sleep(0.01)
except Exception as e:
logger.error(f"Analysis worker error: {e}")
time.sleep(0.1)
def analyze_age(self, face_img):
"""Analyze age using HuggingFace - EXACT SAME AS YOUR WORKING CODE"""
if self.age_model is None or face_img.size == 0:
return "Unknown", 0.0
try:
# Convert to PIL
if len(face_img.shape) == 3:
face_pil = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB))
else:
face_pil = Image.fromarray(face_img).convert("RGB")
# Process
inputs = self.age_processor(images=face_pil, return_tensors="pt")
with torch.no_grad():
outputs = self.age_model(**inputs)
logits = outputs.logits
probs = torch.nn.functional.softmax(logits, dim=1).squeeze().tolist()
# Get prediction
max_idx = probs.index(max(probs))
age_range = self.id2label[str(max_idx)]
confidence = probs[max_idx] * 100
return age_range, confidence
except Exception as e:
logger.error(f"Age analysis error: {e}")
return "Unknown", 0.0
def analyze_gender(self, face_img):
"""Analyze gender using DeepFace - EXACT SAME AS YOUR WORKING CODE"""
if self.deepface is None or face_img.size == 0:
return "Unknown", 0.0
try:
result = self.deepface.analyze(
face_img,
actions=['gender'],
enforce_detection=False,
silent=True
)
if isinstance(result, list):
analysis = result[0]
else:
analysis = result
gender = analysis.get('dominant_gender', 'Unknown')
gender_probs = analysis.get('gender', {})
confidence = max(gender_probs.values()) if gender_probs else 0.0
# Simplify gender
if gender in ['Man', 'Male']:
gender = 'Male'
elif gender in ['Woman', 'Female']:
gender = 'Female'
return gender, confidence
except Exception as e:
logger.error(f"Gender analysis error: {e}")
return "Unknown", 0.0
def get_face_encoding(self, face_img):
"""Get face encoding for recognition - EXACT SAME AS YOUR WORKING CODE"""
if self.deepface is None or face_img.size == 0:
return None
try:
# Preprocess
face_resized = cv2.resize(face_img, (160, 160))
# Get embedding
embedding = self.deepface.represent(
face_resized,
model_name='Facenet',
enforce_detection=False,
detector_backend='opencv'
)
if isinstance(embedding, list) and len(embedding) > 0:
return np.array(embedding[0]['embedding'])
elif isinstance(embedding, dict):
return np.array(embedding['embedding'])
return None
except Exception as e:
# Fallback encoding
try:
face_resized = cv2.resize(face_img, (64, 64))
face_gray = cv2.cvtColor(face_resized, cv2.COLOR_BGR2GRAY)
hist = cv2.calcHist([face_gray], [0], None, [32], [0, 256])
return hist.flatten()
except:
return None
def find_matching_person(self, face_img, threshold=0.4):
"""Find matching person - EXACT SAME AS YOUR WORKING CODE"""
current_encoding = self.get_face_encoding(face_img)
if current_encoding is None:
return None, 0
best_match = None
best_similarity = 0
for person_id, stored_encoding in self.face_encodings.items():
try:
# Cosine similarity
similarity = np.dot(current_encoding, stored_encoding) / (
np.linalg.norm(current_encoding) * np.linalg.norm(stored_encoding)
)
if similarity > threshold and similarity > best_similarity:
best_similarity = similarity
best_match = person_id
except:
continue
return best_match, best_similarity if best_match else (None, 0)
def register_new_person(self, face_img):
"""Register new person - EXACT SAME AS YOUR WORKING CODE"""
encoding = self.get_face_encoding(face_img)
if encoding is None:
return None
self.person_counter += 1
person_id = f"person_{self.person_counter}"
self.face_encodings[person_id] = encoding
logger.info(f"👤 NEW PERSON: {person_id}")
return person_id
def identify_person(self, face_img):
"""Identify person (new or existing) - EXACT SAME AS YOUR WORKING CODE"""
match_result = self.find_matching_person(face_img)
if match_result[0]:
person_id, similarity = match_result
logger.info(f"👤 RECOGNIZED: {person_id} ({similarity:.3f})")
return person_id, False
else:
person_id = self.register_new_person(face_img)
return person_id, True
def detect_faces(self, image):
"""Detect faces in image - EXACT SAME AS YOUR WORKING CODE"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, 1.1, 4, minSize=(60, 60))
return faces
def process_image(self, image, callback=None):
"""Process image and return results - EXACT SAME AS YOUR WORKING CODE"""
faces = self.detect_faces(image)
results = []
for i, (x, y, w, h) in enumerate(faces):
face_img = image[y:y+h, x:x+w]
person_id, is_new = self.identify_person(face_img)
if person_id:
# Get existing result or create placeholder
result = self.face_results.get(person_id, {
'age': 'Analyzing...',
'age_conf': 0,
'gender': 'Analyzing...',
'gender_conf': 0,
'timestamp': time.time(),
'first_seen': time.time()
})
# Add to analysis queue
task = {
'id': person_id,
'image': face_img,
'callback': callback
}
self.analysis_queue.put(task)
# Determine status
current_time = time.time()
first_seen = result.get('first_seen', current_time)
time_known = current_time - first_seen
if time_known < 3:
status = "NEW"
elif time_known < 60:
status = "CURRENT"
else:
status = "RETURNING"
# Convert age to approximate number
age_display = result['age']
if result['age'] in self.id2label.values():
age_map = {
"01-10": "~6 years", "11-20": "~16 years", "21-30": "~25 years",
"31-40": "~35 years", "41-55": "~48 years", "56-65": "~60 years",
"66-80": "~73 years", "80+": "~85 years"
}
age_display = age_map.get(result['age'], result['age'])
results.append({
'person_id': person_id,
'status': status,
'age': age_display,
'age_confidence': result['age_conf'],
'gender': result['gender'],
'gender_confidence': result['gender_conf'],
'face_coordinates': [int(x), int(y), int(w), int(h)],
'is_new': is_new
})
return results
def cleanup_old_results(self):
"""Cleanup old results - EXACT SAME AS YOUR WORKING CODE"""
current_time = time.time()
old_persons = [
pid for pid, result in self.face_results.items()
if current_time - result.get('timestamp', 0) > 300 # 5 minutes
]
for person_id in old_persons:
self.face_results.pop(person_id, None)
self.face_encodings.pop(person_id, None)
logger.info(f"🗑️ REMOVED: {person_id}")
def __del__(self):
"""Cleanup when detector is destroyed"""
self.running = False
if hasattr(self, 'analysis_thread'):
self.analysis_thread.join(timeout=1.0)

View File

@ -39,3 +39,17 @@ x-transformers==2.1.37
torchmetrics==1.5.0
attrdict==2.0.1
activations==0.1.0
fastapi==0.104.1
uvicorn[standard]==0.24.0
websockets==12.0
python-multipart==0.0.6
opencv-python==4.8.1.78
pillow==10.1.0
numpy<2.0
torch>=2.1.0
transformers>=4.35.0
deepface>=0.0.79
tensorflow>=2.15.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
aiofiles==23.2.1

36
utils.py Normal file
View File

@ -0,0 +1,36 @@
"""
utils.py - Utility Functions
Exact same utility functions from your working code
"""
import cv2
import numpy as np
from PIL import Image
import base64
import io
import logging
logger = logging.getLogger(__name__)
def decode_base64_image(base64_string: str) -> np.ndarray:
"""
Decode base64 image to numpy array - EXACT SAME AS YOUR WORKING CODE
"""
try:
# Remove data URL prefix if present
if ',' in base64_string:
base64_string = base64_string.split(',')[1]
# Decode base64
image_data = base64.b64decode(base64_string)
# Convert to PIL Image
pil_image = Image.open(io.BytesIO(image_data))
# Convert to OpenCV format
opencv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
return opencv_image
except Exception as e:
logger.error(f"Image decode error: {e}")
return None