diff --git a/api.py b/api.py index 438cd069..5fbecf62 100644 --- a/api.py +++ b/api.py @@ -30,6 +30,24 @@ import config as global_config import logging import subprocess +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, HTTPException +from fastapi.middleware.cors import CORSMiddleware +import cv2 +import numpy as np +import asyncio +import json +import time +import logging +from datetime import datetime +from typing import Dict + +from face_detector import AgeGenderDetector +from utils import decode_base64_image + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + import nltk nltk.download('averaged_perceptron_tagger_eng') @@ -871,6 +889,45 @@ change_gpt_sovits_weights(gpt_path = gpt_path, sovits_path = sovits_path) # æŽ„ćŁéƒšćˆ† # -------------------------------- app = FastAPI() +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + + +# Global detector instance +detector = AgeGenderDetector() + +# WebSocket connection manager +class ConnectionManager: + def __init__(self): + self.active_connections: Dict[str, WebSocket] = {} + + async def connect(self, websocket: WebSocket, session_id: str): + await websocket.accept() + self.active_connections[session_id] = websocket + logger.info(f"🔌 Connected: {session_id}") + + def disconnect(self, session_id: str): + self.active_connections.pop(session_id, None) + logger.info(f"🔌 Disconnected: {session_id}") + + async def send_message(self, session_id: str, message: dict): + websocket = self.active_connections.get(session_id) + if websocket: + try: + await websocket.send_text(json.dumps(message)) + except: + self.disconnect(session_id) + +manager = ConnectionManager() + + @app.post("/") async def tts_endpoint(request: Request): @@ -1101,6 +1158,99 @@ async def tts_endpoint( print(f"the base path is {refer_wav_path}") return handle(refer_wav_path, prompt_text, prompt_language, text, text_language, cut_punc, top_k, top_p, temperature, speed, inp_refs, sample_steps, if_sr) +@app.post("/analyze_image") +async def analyze_image(file: UploadFile = File(...)): + """Analyze uploaded image""" + try: + # Read image + image_data = await file.read() + + # Convert to OpenCV format + nparr = np.frombuffer(image_data, np.uint8) + image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + + if image is None: + raise HTTPException(status_code=400, detail="Invalid image format") + + # Process image + start_time = time.time() + results = detector.process_image(image) + processing_time = time.time() - start_time + + # Cleanup periodically + if len(detector.face_results) > 50: + detector.cleanup_old_results() + + return { + "success": True, + "processing_time": round(processing_time, 2), + "people": results, + "total_people": len(results), + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Image analysis error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.websocket("/ws/{session_id}") +async def websocket_endpoint(websocket: WebSocket, session_id: str): + """WebSocket endpoint for real-time processing""" + await manager.connect(websocket, session_id) + + def result_callback(person_id: str, result: dict): + """Callback for when analysis is complete""" + asyncio.create_task(manager.send_message(session_id, { + "type": "analysis_complete", + "person_id": person_id, + "result": result, + "timestamp": datetime.now().isoformat() + })) + + try: + while True: + # Receive data + data = await websocket.receive_text() + message = json.loads(data) + + if message.get("type") == "frame": + # Process frame + base64_image = message.get("image") + if base64_image: + image = decode_base64_image(base64_image) + if image is not None: + results = detector.process_image(image, callback=result_callback) + + # Send immediate response + await manager.send_message(session_id, { + "type": "frame_processed", + "people": results, + "total_people": len(results), + "timestamp": datetime.now().isoformat() + }) + + elif message.get("type") == "ping": + await manager.send_message(session_id, { + "type": "pong", + "timestamp": datetime.now().isoformat() + }) + + except WebSocketDisconnect: + manager.disconnect(session_id) + except Exception as e: + logger.error(f"WebSocket error: {e}") + manager.disconnect(session_id) + +@app.get("/stats") +async def get_stats(): + """Get system statistics""" + return { + "active_connections": len(manager.active_connections), + "known_persons": len(detector.face_encodings), + "cached_results": len(detector.face_results), + "analysis_queue_size": detector.analysis_queue.qsize(), + "system_time": datetime.now().isoformat() + } if __name__ == "__main__": logging.info("the server is running") diff --git a/detectors.py b/detectors.py new file mode 100644 index 00000000..8946b88e --- /dev/null +++ b/detectors.py @@ -0,0 +1,528 @@ +""" +detectors.py - Face Detection Algorithms and Validators +Contains different face detection methods and quality validation +""" + +import cv2 +import numpy as np +import logging +from typing import List, Tuple + +logger = logging.getLogger(__name__) + +class EnhancedFaceDetector: + """Enhanced face detector using multiple detection methods""" + + def __init__(self): + self.detectors = {} + self.load_detectors() + + def load_detectors(self): + """Load multiple face detection algorithms""" + # 1. MediaPipe Face Detection (primary) + try: + import mediapipe as mp + self.mp_face_detection = mp.solutions.face_detection + self.mp_face_detector = self.mp_face_detection.FaceDetection( + model_selection=1, + min_detection_confidence=0.5 # Relaxed threshold + ) + self.detectors['mediapipe'] = True + logger.info("✅ MediaPipe Face Detector loaded") + except ImportError: + logger.warning("⚠ MediaPipe not available - install with: pip install mediapipe") + self.detectors['mediapipe'] = False + + # 2. Haar Cascade (fallback) + try: + self.face_cascade = cv2.CascadeClassifier( + cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' + ) + self.detectors['haar'] = True + logger.info("✅ Haar cascade loaded as fallback") + except Exception as e: + logger.error(f"❌ Haar cascade loading error: {e}") + self.detectors['haar'] = False + + def detect_faces_mediapipe(self, image): + """Detect faces using MediaPipe (most accurate)""" + if not self.detectors.get('mediapipe', False): + return [] + + try: + # Convert BGR to RGB + rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + results = self.mp_face_detector.process(rgb_image) + + faces = [] + if results.detections: + h, w = image.shape[:2] + + for detection in results.detections: + bbox = detection.location_data.relative_bounding_box + confidence = detection.score[0] + + # Convert relative coordinates to absolute + x = max(0, int(bbox.xmin * w)) + y = max(0, int(bbox.ymin * h)) + width = int(bbox.width * w) + height = int(bbox.height * h) + + # Ensure face is within image bounds + x = min(x, w - 1) + y = min(y, h - 1) + width = min(width, w - x) + height = min(height, h - y) + + if width > 30 and height > 30: # Minimum size + faces.append((x, y, width, height, confidence)) + + return faces + + except Exception as e: + logger.error(f"MediaPipe detection error: {e}") + return [] + + def detect_faces_haar(self, image): + """Detect faces using Haar cascades (fallback)""" + if not self.detectors.get('haar', False): + return [] + + try: + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + faces = self.face_cascade.detectMultiScale( + gray, + scaleFactor=1.1, + minNeighbors=3, # Relaxed from 5 + minSize=(30, 30), # Relaxed from (80, 80) + maxSize=(500, 500) + ) + + # Add estimated confidence + faces_with_conf = [] + for (x, y, w, h) in faces: + # Estimate confidence based on face size + confidence = min(0.9, max(0.5, (w * h) / (100 * 100))) + faces_with_conf.append((x, y, w, h, confidence)) + + return faces_with_conf + + except Exception as e: + logger.error(f"Haar detection error: {e}") + return [] + + def detect_faces_dnn(self, image): + """Detect faces using OpenCV DNN (optional third method)""" + try: + # This requires pre-trained DNN model files + # For now, we'll skip this implementation + # You can add DNN detection here if you have the model files + return [] + except Exception as e: + logger.error(f"DNN detection error: {e}") + return [] + + def detect_faces(self, image): + """Main face detection using best available method""" + all_faces = [] + + # Try MediaPipe first (most accurate) + faces = self.detect_faces_mediapipe(image) + if faces: + all_faces.extend([(x, y, w, h, conf, 'mediapipe') for x, y, w, h, conf in faces]) + + # If no MediaPipe faces or low confidence, try Haar + if not all_faces or max([f[4] for f in all_faces]) < 0.7: + faces = self.detect_faces_haar(image) + all_faces.extend([(x, y, w, h, conf, 'haar') for x, y, w, h, conf in faces]) + + # Filter and remove duplicates + filtered_faces = self.filter_and_deduplicate_faces(all_faces) + + return filtered_faces + + def filter_and_deduplicate_faces(self, faces): + """Remove duplicate faces and apply quality filters""" + if not faces: + return [] + + # Sort by confidence (highest first) + faces.sort(key=lambda x: x[4], reverse=True) + + filtered = [] + for face in faces: + x, y, w, h, conf, method = face + + # Apply quality checks + if not self.is_valid_face_detection(x, y, w, h, conf): + continue + + # Check for overlap with existing faces + is_duplicate = False + for existing in filtered: + if self.faces_overlap(face, existing): + is_duplicate = True + break + + if not is_duplicate: + filtered.append(face) + + return filtered + + def is_valid_face_detection(self, x, y, w, h, confidence): + """Validate face detection quality""" + # Minimum confidence threshold (relaxed) + if confidence < 0.3: + return False + + # Minimum size check (relaxed) + if w < 30 or h < 30: + return False + + # Aspect ratio check (more permissive) + aspect_ratio = w / h + if aspect_ratio < 0.4 or aspect_ratio > 2.5: + return False + + # Coordinates should be positive + if x < 0 or y < 0: + return False + + return True + + def faces_overlap(self, face1, face2, threshold=0.5): + """Check if two face detections overlap significantly""" + x1, y1, w1, h1 = face1[:4] + x2, y2, w2, h2 = face2[:4] + + # Calculate intersection area + xi1 = max(x1, x2) + yi1 = max(y1, y2) + xi2 = min(x1 + w1, x2 + w2) + yi2 = min(y1 + h1, y2 + h2) + + if xi2 <= xi1 or yi2 <= yi1: + return False + + intersection = (xi2 - xi1) * (yi2 - yi1) + + # Calculate union area + area1 = w1 * h1 + area2 = w2 * h2 + union = area1 + area2 - intersection + + # Calculate IoU (Intersection over Union) + iou = intersection / union if union > 0 else 0 + + return iou > threshold + + +class FaceQualityValidator: + """Validate face quality before analysis""" + + @staticmethod + def is_face_clear(face_img, blur_threshold=30): + """Check if face is clear enough (not too blurry)""" + try: + gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) + laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() + return laplacian_var > blur_threshold + except: + return True # If check fails, assume it's OK + + @staticmethod + def has_sufficient_size(face_img, min_size=30): + """Check if face has sufficient resolution""" + h, w = face_img.shape[:2] + return min(h, w) >= min_size + + @staticmethod + def is_properly_aligned(face_img): + """Basic check for face alignment (relaxed)""" + try: + h, w = face_img.shape[:2] + # More permissive aspect ratio + aspect_ratio = w / h + return 0.3 <= aspect_ratio <= 3.0 + except: + return True # If check fails, assume it's OK + + @staticmethod + def has_good_contrast(face_img, min_std=20): + """Check if face has sufficient contrast""" + try: + gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) + std_dev = np.std(gray) + return std_dev > min_std + except: + return True # If check fails, assume it's OK + + @staticmethod + def is_well_lit(face_img, min_brightness=30, max_brightness=220): + """Check if face is well lit (not too dark or overexposed)""" + try: + gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) + mean_brightness = np.mean(gray) + return min_brightness < mean_brightness < max_brightness + except: + return True # If check fails, assume it's OK + + @staticmethod + def validate_face(face_img): + """Complete face validation with relaxed criteria""" + if face_img is None or face_img.size == 0: + return False, "Empty face image" + + if not FaceQualityValidator.has_sufficient_size(face_img): + return False, "Face too small" + + # Skip strict checks for now - they were too restrictive + # if not FaceQualityValidator.is_face_clear(face_img): + # return False, "Face too blurry" + + if not FaceQualityValidator.is_properly_aligned(face_img): + return False, "Face poorly aligned" + + # Optional additional checks (commented out for relaxed validation) + # if not FaceQualityValidator.has_good_contrast(face_img): + # return False, "Face has poor contrast" + + # if not FaceQualityValidator.is_well_lit(face_img): + # return False, "Face is poorly lit" + + return True, "Face valid" + + @staticmethod + def get_face_quality_score(face_img): + """Get overall quality score for face (0-100)""" + if face_img is None or face_img.size == 0: + return 0 + + score = 0 + + # Size score (0-25 points) + h, w = face_img.shape[:2] + min_dim = min(h, w) + if min_dim >= 100: + score += 25 + elif min_dim >= 60: + score += 20 + elif min_dim >= 30: + score += 15 + else: + score += 5 + + # Clarity score (0-25 points) + try: + gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) + laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() + if laplacian_var > 100: + score += 25 + elif laplacian_var > 50: + score += 20 + elif laplacian_var > 20: + score += 15 + else: + score += 10 + except: + score += 15 # Default moderate score + + # Alignment score (0-25 points) + try: + aspect_ratio = w / h + if 0.8 <= aspect_ratio <= 1.2: + score += 25 # Perfect square-ish + elif 0.6 <= aspect_ratio <= 1.6: + score += 20 # Good + elif 0.4 <= aspect_ratio <= 2.0: + score += 15 # Acceptable + else: + score += 5 # Poor + except: + score += 15 # Default moderate score + + # Contrast score (0-25 points) + try: + gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) + std_dev = np.std(gray) + if std_dev > 60: + score += 25 + elif std_dev > 40: + score += 20 + elif std_dev > 20: + score += 15 + else: + score += 5 + except: + score += 15 # Default moderate score + + return min(100, score) + + @staticmethod + def enhance_face_for_analysis(face_img): + """Apply basic enhancement to improve face for analysis""" + try: + # Convert to LAB color space + lab = cv2.cvtColor(face_img, cv2.COLOR_BGR2LAB) + l, a, b = cv2.split(lab) + + # Apply CLAHE to L channel (brightness) + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4, 4)) + l = clahe.apply(l) + + # Merge back + enhanced = cv2.merge([l, a, b]) + enhanced = cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR) + + # Apply slight Gaussian blur to reduce noise + enhanced = cv2.GaussianBlur(enhanced, (3, 3), 0) + + return enhanced + + except Exception as e: + logger.error(f"Face enhancement error: {e}") + return face_img + + +class FaceTracker: + """Track faces across frames for better stability""" + + def __init__(self, max_distance=50, max_age=30): + self.tracks = {} + self.next_id = 1 + self.max_distance = max_distance + self.max_age = max_age + + def update(self, detections): + """Update tracks with new detections""" + # Calculate distances between existing tracks and new detections + matched_tracks = {} + unmatched_detections = list(detections) + + for track_id, track in self.tracks.items(): + if track['age'] > self.max_age: + continue # Skip old tracks + + best_match = None + best_distance = float('inf') + + for i, detection in enumerate(unmatched_detections): + x, y, w, h = detection[:4] + center_x, center_y = x + w//2, y + h//2 + + track_x, track_y = track['center'] + distance = np.sqrt((center_x - track_x)**2 + (center_y - track_y)**2) + + if distance < self.max_distance and distance < best_distance: + best_distance = distance + best_match = i + + if best_match is not None: + detection = unmatched_detections[best_match] + x, y, w, h = detection[:4] + + # Update track + self.tracks[track_id].update({ + 'center': (x + w//2, y + h//2), + 'bbox': (x, y, w, h), + 'age': 0, + 'confidence': detection[4] if len(detection) > 4 else 0.5 + }) + + matched_tracks[track_id] = detection + unmatched_detections.pop(best_match) + + # Age existing tracks + for track_id in list(self.tracks.keys()): + if track_id not in matched_tracks: + self.tracks[track_id]['age'] += 1 + if self.tracks[track_id]['age'] > self.max_age: + del self.tracks[track_id] + + # Create new tracks for unmatched detections + for detection in unmatched_detections: + x, y, w, h = detection[:4] + self.tracks[self.next_id] = { + 'center': (x + w//2, y + h//2), + 'bbox': (x, y, w, h), + 'age': 0, + 'confidence': detection[4] if len(detection) > 4 else 0.5, + 'created_frame': self.next_id + } + matched_tracks[self.next_id] = detection + self.next_id += 1 + + return matched_tracks + + def get_stable_faces(self, min_track_length=3): + """Get faces that have been tracked for a minimum number of frames""" + stable_tracks = {} + for track_id, track in self.tracks.items(): + if track['age'] < min_track_length: + stable_tracks[track_id] = track + return stable_tracks + + +class MultiScaleDetector: + """Detect faces at multiple scales for better accuracy""" + + def __init__(self, base_detector): + self.base_detector = base_detector + self.scales = [1.0, 0.8, 1.2] # Different scales to try + + def detect_faces(self, image): + """Detect faces at multiple scales""" + all_detections = [] + h, w = image.shape[:2] + + for scale in self.scales: + if scale != 1.0: + # Resize image + new_w = int(w * scale) + new_h = int(h * scale) + resized = cv2.resize(image, (new_w, new_h)) + else: + resized = image + + # Detect faces + faces = self.base_detector.detect_faces(resized) + + # Scale coordinates back to original size + for face in faces: + x, y, w_f, h_f, conf, method = face + if scale != 1.0: + x = int(x / scale) + y = int(y / scale) + w_f = int(w_f / scale) + h_f = int(h_f / scale) + + all_detections.append((x, y, w_f, h_f, conf, f"{method}_scale_{scale}")) + + # Remove duplicates and return best detections + return self.base_detector.filter_and_deduplicate_faces(all_detections) + + +# Factory function to create detector with best available methods +def create_face_detector(use_tracking=False, use_multiscale=False): + """ + Factory function to create the best available face detector + + Args: + use_tracking: Enable face tracking across frames + use_multiscale: Enable multi-scale detection + + Returns: + Configured face detector + """ + base_detector = EnhancedFaceDetector() + + if use_multiscale: + detector = MultiScaleDetector(base_detector) + else: + detector = base_detector + + if use_tracking: + # Note: Tracking would need to be integrated into the main detection loop + logger.info("Face tracking enabled") + + return detector \ No newline at end of file diff --git a/face_detector.py b/face_detector.py new file mode 100644 index 00000000..0e14493e --- /dev/null +++ b/face_detector.py @@ -0,0 +1,344 @@ +""" +face_detector.py - Core Face Detection and Analysis +Exact same logic as your working code, just modularized +""" + +import cv2 +import numpy as np +from PIL import Image +import torch +import time +import threading +from queue import Queue +import logging +from typing import Dict, List, Optional + +logger = logging.getLogger(__name__) + +class AgeGenderDetector: + """Enhanced Age & Gender Detection System - EXACT SAME LOGIC AS YOUR WORKING CODE""" + + def __init__(self): + self.face_results = {} + self.face_encodings = {} + self.person_counter = 0 + self.analysis_queue = Queue() + self.running = True + + # Load models + self.load_models() + + # Start analysis worker + self.analysis_thread = threading.Thread(target=self.analysis_worker, daemon=True) + self.analysis_thread.start() + + logger.info("✅ AgeGenderDetector initialized") + + def load_models(self): + """Load AI models - EXACT SAME AS YOUR WORKING CODE""" + try: + # Load DeepFace + from deepface import DeepFace + self.deepface = DeepFace + logger.info("✅ DeepFace loaded") + except ImportError: + logger.error("❌ DeepFace not available") + self.deepface = None + + try: + # Load HuggingFace age model + from transformers import AutoImageProcessor, SiglipForImageClassification + model_name = "prithivMLmods/facial-age-detection" + self.age_model = SiglipForImageClassification.from_pretrained(model_name) + self.age_processor = AutoImageProcessor.from_pretrained(model_name) + logger.info("✅ HuggingFace age model loaded") + except Exception as e: + logger.error(f"❌ HuggingFace model error: {e}") + self.age_model = None + self.age_processor = None + + # Age labels + self.id2label = { + "0": "01-10", "1": "11-20", "2": "21-30", "3": "31-40", + "4": "41-55", "5": "56-65", "6": "66-80", "7": "80+" + } + + # Face detector + self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') + + def analysis_worker(self): + """Background analysis worker - EXACT SAME AS YOUR WORKING CODE""" + while self.running: + try: + if not self.analysis_queue.empty(): + task = self.analysis_queue.get(timeout=0.1) + if task is None: + break + + person_id = task['id'] + face_img = task['image'] + callback = task.get('callback') + + # Analyze + age, age_conf = self.analyze_age(face_img) + gender, gender_conf = self.analyze_gender(face_img) + + # Store results + current_time = time.time() + if person_id in self.face_results: + first_seen = self.face_results[person_id].get('first_seen', current_time) + else: + first_seen = current_time + + result = { + 'age': age, + 'age_conf': age_conf, + 'gender': gender, + 'gender_conf': gender_conf, + 'timestamp': current_time, + 'first_seen': first_seen + } + + self.face_results[person_id] = result + + # Call callback if provided + if callback: + callback(person_id, result) + + else: + time.sleep(0.01) + except Exception as e: + logger.error(f"Analysis worker error: {e}") + time.sleep(0.1) + + def analyze_age(self, face_img): + """Analyze age using HuggingFace - EXACT SAME AS YOUR WORKING CODE""" + if self.age_model is None or face_img.size == 0: + return "Unknown", 0.0 + + try: + # Convert to PIL + if len(face_img.shape) == 3: + face_pil = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)) + else: + face_pil = Image.fromarray(face_img).convert("RGB") + + # Process + inputs = self.age_processor(images=face_pil, return_tensors="pt") + + with torch.no_grad(): + outputs = self.age_model(**inputs) + logits = outputs.logits + probs = torch.nn.functional.softmax(logits, dim=1).squeeze().tolist() + + # Get prediction + max_idx = probs.index(max(probs)) + age_range = self.id2label[str(max_idx)] + confidence = probs[max_idx] * 100 + + return age_range, confidence + except Exception as e: + logger.error(f"Age analysis error: {e}") + return "Unknown", 0.0 + + def analyze_gender(self, face_img): + """Analyze gender using DeepFace - EXACT SAME AS YOUR WORKING CODE""" + if self.deepface is None or face_img.size == 0: + return "Unknown", 0.0 + + try: + result = self.deepface.analyze( + face_img, + actions=['gender'], + enforce_detection=False, + silent=True + ) + + if isinstance(result, list): + analysis = result[0] + else: + analysis = result + + gender = analysis.get('dominant_gender', 'Unknown') + gender_probs = analysis.get('gender', {}) + confidence = max(gender_probs.values()) if gender_probs else 0.0 + + # Simplify gender + if gender in ['Man', 'Male']: + gender = 'Male' + elif gender in ['Woman', 'Female']: + gender = 'Female' + + return gender, confidence + except Exception as e: + logger.error(f"Gender analysis error: {e}") + return "Unknown", 0.0 + + def get_face_encoding(self, face_img): + """Get face encoding for recognition - EXACT SAME AS YOUR WORKING CODE""" + if self.deepface is None or face_img.size == 0: + return None + + try: + # Preprocess + face_resized = cv2.resize(face_img, (160, 160)) + + # Get embedding + embedding = self.deepface.represent( + face_resized, + model_name='Facenet', + enforce_detection=False, + detector_backend='opencv' + ) + + if isinstance(embedding, list) and len(embedding) > 0: + return np.array(embedding[0]['embedding']) + elif isinstance(embedding, dict): + return np.array(embedding['embedding']) + return None + except Exception as e: + # Fallback encoding + try: + face_resized = cv2.resize(face_img, (64, 64)) + face_gray = cv2.cvtColor(face_resized, cv2.COLOR_BGR2GRAY) + hist = cv2.calcHist([face_gray], [0], None, [32], [0, 256]) + return hist.flatten() + except: + return None + + def find_matching_person(self, face_img, threshold=0.4): + """Find matching person - EXACT SAME AS YOUR WORKING CODE""" + current_encoding = self.get_face_encoding(face_img) + if current_encoding is None: + return None, 0 + + best_match = None + best_similarity = 0 + + for person_id, stored_encoding in self.face_encodings.items(): + try: + # Cosine similarity + similarity = np.dot(current_encoding, stored_encoding) / ( + np.linalg.norm(current_encoding) * np.linalg.norm(stored_encoding) + ) + + if similarity > threshold and similarity > best_similarity: + best_similarity = similarity + best_match = person_id + except: + continue + + return best_match, best_similarity if best_match else (None, 0) + + def register_new_person(self, face_img): + """Register new person - EXACT SAME AS YOUR WORKING CODE""" + encoding = self.get_face_encoding(face_img) + if encoding is None: + return None + + self.person_counter += 1 + person_id = f"person_{self.person_counter}" + self.face_encodings[person_id] = encoding + + logger.info(f"đŸ‘€ NEW PERSON: {person_id}") + return person_id + + def identify_person(self, face_img): + """Identify person (new or existing) - EXACT SAME AS YOUR WORKING CODE""" + match_result = self.find_matching_person(face_img) + + if match_result[0]: + person_id, similarity = match_result + logger.info(f"đŸ‘€ RECOGNIZED: {person_id} ({similarity:.3f})") + return person_id, False + else: + person_id = self.register_new_person(face_img) + return person_id, True + + def detect_faces(self, image): + """Detect faces in image - EXACT SAME AS YOUR WORKING CODE""" + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + faces = self.face_cascade.detectMultiScale(gray, 1.1, 4, minSize=(60, 60)) + return faces + + def process_image(self, image, callback=None): + """Process image and return results - EXACT SAME AS YOUR WORKING CODE""" + faces = self.detect_faces(image) + results = [] + + for i, (x, y, w, h) in enumerate(faces): + face_img = image[y:y+h, x:x+w] + person_id, is_new = self.identify_person(face_img) + + if person_id: + # Get existing result or create placeholder + result = self.face_results.get(person_id, { + 'age': 'Analyzing...', + 'age_conf': 0, + 'gender': 'Analyzing...', + 'gender_conf': 0, + 'timestamp': time.time(), + 'first_seen': time.time() + }) + + # Add to analysis queue + task = { + 'id': person_id, + 'image': face_img, + 'callback': callback + } + self.analysis_queue.put(task) + + # Determine status + current_time = time.time() + first_seen = result.get('first_seen', current_time) + time_known = current_time - first_seen + + if time_known < 3: + status = "NEW" + elif time_known < 60: + status = "CURRENT" + else: + status = "RETURNING" + + # Convert age to approximate number + age_display = result['age'] + if result['age'] in self.id2label.values(): + age_map = { + "01-10": "~6 years", "11-20": "~16 years", "21-30": "~25 years", + "31-40": "~35 years", "41-55": "~48 years", "56-65": "~60 years", + "66-80": "~73 years", "80+": "~85 years" + } + age_display = age_map.get(result['age'], result['age']) + + results.append({ + 'person_id': person_id, + 'status': status, + 'age': age_display, + 'age_confidence': result['age_conf'], + 'gender': result['gender'], + 'gender_confidence': result['gender_conf'], + 'face_coordinates': [int(x), int(y), int(w), int(h)], + 'is_new': is_new + }) + + return results + + def cleanup_old_results(self): + """Cleanup old results - EXACT SAME AS YOUR WORKING CODE""" + current_time = time.time() + old_persons = [ + pid for pid, result in self.face_results.items() + if current_time - result.get('timestamp', 0) > 300 # 5 minutes + ] + + for person_id in old_persons: + self.face_results.pop(person_id, None) + self.face_encodings.pop(person_id, None) + logger.info(f"đŸ—‘ïž REMOVED: {person_id}") + + def __del__(self): + """Cleanup when detector is destroyed""" + self.running = False + if hasattr(self, 'analysis_thread'): + self.analysis_thread.join(timeout=1.0) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 10fa772e..c87864d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,3 +39,17 @@ x-transformers==2.1.37 torchmetrics==1.5.0 attrdict==2.0.1 activations==0.1.0 +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +websockets==12.0 +python-multipart==0.0.6 +opencv-python==4.8.1.78 +pillow==10.1.0 +numpy<2.0 +torch>=2.1.0 +transformers>=4.35.0 +deepface>=0.0.79 +tensorflow>=2.15.0 +python-jose[cryptography]==3.3.0 +passlib[bcrypt]==1.7.4 +aiofiles==23.2.1 \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 00000000..9e54b9e2 --- /dev/null +++ b/utils.py @@ -0,0 +1,36 @@ +""" +utils.py - Utility Functions +Exact same utility functions from your working code +""" + +import cv2 +import numpy as np +from PIL import Image +import base64 +import io +import logging + +logger = logging.getLogger(__name__) + +def decode_base64_image(base64_string: str) -> np.ndarray: + """ + Decode base64 image to numpy array - EXACT SAME AS YOUR WORKING CODE + """ + try: + # Remove data URL prefix if present + if ',' in base64_string: + base64_string = base64_string.split(',')[1] + + # Decode base64 + image_data = base64.b64decode(base64_string) + + # Convert to PIL Image + pil_image = Image.open(io.BytesIO(image_data)) + + # Convert to OpenCV format + opencv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) + + return opencv_image + except Exception as e: + logger.error(f"Image decode error: {e}") + return None \ No newline at end of file