Spaces:
Running
Running
| import os | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| from fastapi import HTTPException | |
| import cv2 | |
| import numpy as np | |
| from datetime import datetime | |
| from exif import Image as ExifImage | |
| from io import BytesIO | |
| from collections import defaultdict, Counter | |
| # HuggingFace bucket API | |
| from huggingface_hub import ( | |
| list_bucket_tree, | |
| batch_bucket_files, | |
| download_bucket_files, | |
| get_bucket_paths_info, | |
| ) | |
| # ---------------- CONFIG IMPORTS ---------------- | |
| from .config import ( | |
| DETECT_MODEL, | |
| BUCK_DOE_MODEL, | |
| BUCK_TYPE_MODEL, | |
| ALLOWED_EXTENSIONS, | |
| MIN_IMAGES, | |
| MAX_IMAGES, | |
| UPLOAD_DIR, # e.g. "codewithRiz/test_bucket" | |
| logger, | |
| ) | |
| # ---------------------------------------------------------------- | |
| # BUCKET SETUP | |
| # All data is stored under: | |
| # user_data/<user_id>/cameras.json | |
| # user_data/<user_id>/<camera_name>/raw/<filename> | |
| # user_data/<user_id>/<camera_name>/<camera_name>_detections.json | |
| # ---------------------------------------------------------------- | |
| BUCKET_ID = UPLOAD_DIR # "namespace/bucket-name" | |
| BASE_DIR = "user_data" # top-level folder inside the bucket | |
| STORAGE_BACKEND = "huggingface" | |
| # ================================================================ | |
| # BUCKET INTERNAL HELPERS (replace local Path / open / json.load) | |
| # ================================================================ | |
| def _bucket_key(user_id: str, *parts: str) -> str: | |
| """Build a bucket key: user_data/<user_id>/<parts...>""" | |
| return "/".join([BASE_DIR, user_id, *parts]) | |
| def _read_bucket_json(key: str): | |
| """Download JSON from bucket. Returns parsed object or None on miss.""" | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tf: | |
| tmp_path = tf.name | |
| download_bucket_files(BUCKET_ID, files=[(key, tmp_path)]) | |
| with open(tmp_path, "r") as f: | |
| data = json.load(f) | |
| os.unlink(tmp_path) | |
| return data | |
| except Exception as e: | |
| logger.debug(f"_read_bucket_json({key}): {e}") | |
| return None | |
| def _write_bucket_json(key: str, data): | |
| """Serialize data to JSON and upload to bucket at key.""" | |
| raw_bytes = json.dumps(data, indent=2, default=str).encode("utf-8") | |
| batch_bucket_files(BUCKET_ID, add=[(raw_bytes, key)]) | |
| def _key_exists(key: str) -> bool: | |
| """Return True if key exists in the bucket.""" | |
| try: | |
| info = list(get_bucket_paths_info(BUCKET_ID, [key])) | |
| return bool(info) | |
| except Exception: | |
| return False | |
| def _list_prefix(prefix: str) -> list: | |
| """Return all file items under prefix (recursive).""" | |
| try: | |
| return [ | |
| item | |
| for item in list_bucket_tree(BUCKET_ID, prefix=prefix, recursive=True) | |
| if item.type == "file" | |
| ] | |
| except Exception: | |
| return [] | |
| # ================================================================ | |
| # ORIGINAL HELPERS (names unchanged, now return bucket keys) | |
| # ================================================================ | |
| def get_user_folder(user_id: str) -> str: | |
| """Return the bucket prefix for user's folder (no creation needed).""" | |
| return f"{BASE_DIR}/{user_id}" | |
| def get_user_file(user_id: str) -> str: | |
| """Return the bucket key for user's cameras.json.""" | |
| return f"{get_user_folder(user_id)}/cameras.json" | |
| # ================================================================ | |
| # VALIDATION | |
| # ================================================================ | |
| def validate_form(user_id, camera_name, images): | |
| if not user_id or not user_id.strip(): | |
| raise HTTPException(400, "user_id is required") | |
| if not camera_name or not camera_name.strip(): | |
| raise HTTPException(400, "camera_name is required") | |
| if not images or len(images) == 0: | |
| raise HTTPException(400, "At least one image is required") | |
| images = [f for f in images if f.filename and f.filename.strip()] | |
| if len(images) < MIN_IMAGES: | |
| raise HTTPException(400, f"At least {MIN_IMAGES} image(s) required") | |
| if len(images) > MAX_IMAGES: | |
| raise HTTPException(400, f"Maximum {MAX_IMAGES} images allowed") | |
| for f in images: | |
| if "." not in f.filename: | |
| raise HTTPException(400, f"Invalid file: {f.filename}") | |
| ext = f.filename.rsplit(".", 1)[1].lower() | |
| if ext not in ALLOWED_EXTENSIONS: | |
| raise HTTPException(400, f"Invalid file type: {f.filename}") | |
| return images | |
| # ================================================================ | |
| # EXIF / METADATA | |
| # ================================================================ | |
| def make_json_safe(value): | |
| """Convert EXIF values to JSON-serializable types""" | |
| if hasattr(value, "name"): | |
| return value.name | |
| if isinstance(value, (bytes, bytearray)): | |
| return value.decode(errors="ignore") | |
| if isinstance(value, (tuple, list)): | |
| return [make_json_safe(v) for v in value] | |
| if not isinstance(value, (str, int, float, bool, type(None))): | |
| return str(value) | |
| return value | |
| def extract_metadata(image_bytes): | |
| metadata = { | |
| "upload_datetime": datetime.utcnow().isoformat() + "Z" | |
| } | |
| try: | |
| exif_img = ExifImage(BytesIO(image_bytes)) | |
| if not exif_img.has_exif: | |
| return metadata | |
| exif_dict = {} | |
| for tag in exif_img.list_all(): | |
| try: | |
| value = getattr(exif_img, tag) | |
| value = make_json_safe(value) | |
| if value not in ("", None, [], {}): | |
| exif_dict[tag] = value | |
| except Exception: | |
| continue | |
| if exif_dict: | |
| metadata["exif"] = exif_dict | |
| except Exception: | |
| pass | |
| return metadata | |
| # ================================================================ | |
| # IMAGE PROCESSING | |
| # ================================================================ | |
| def process_image(image): | |
| """Run 3-stage detection and classification with dynamic confidence""" | |
| detections = [] | |
| results = DETECT_MODEL(image, conf=0.8, iou=0.4, agnostic_nms=True) # Stage 1: Deer detection | |
| for r in results: | |
| for box in r.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| crop = image[y1:y2, x1:x2] | |
| if crop.size == 0: | |
| continue | |
| # ---------------- Stage 2: Buck/Doe ---------------- | |
| buck_res = BUCK_DOE_MODEL(crop) | |
| buck_probs = buck_res[0].probs | |
| top1_idx = buck_probs.top1 | |
| buck_name = buck_res[0].names[top1_idx] | |
| buck_conf = float(buck_probs.top1conf) | |
| if buck_name.lower() == "buck": | |
| # ---------------- Stage 3: Buck Type ---------------- | |
| type_res = BUCK_TYPE_MODEL(crop) | |
| type_probs = type_res[0].probs | |
| top1_type_idx = type_probs.top1 | |
| type_name = type_res[0].names[top1_type_idx] | |
| type_conf = float(type_probs.top1conf) | |
| label = f"Deer | Buck | {type_name}" | |
| final_conf = type_conf | |
| else: | |
| # Doe: use stage 2 confidence | |
| label = f"Deer | Doe " | |
| final_conf = buck_conf | |
| detections.append({ | |
| "label": label, | |
| "bbox": [x1, y1, x2, y2], | |
| "confidence": final_conf | |
| }) | |
| return detections | |
| # ================================================================ | |
| # CAMERA VALIDATION | |
| # ================================================================ | |
| def validate_user_and_camera(user_id: str, camera_name: str): | |
| if not user_exists(user_id): | |
| raise HTTPException(404, "User not found") | |
| cameras = load_cameras(user_id) | |
| if not any(c["camera_name"] == camera_name for c in cameras): | |
| raise HTTPException(404, "Camera not registered") | |
| # ================================================================ | |
| # IMAGE SAVE | |
| # ================================================================ | |
| def save_image(user_id, camera_name, filename, data): | |
| key = _bucket_key(user_id, camera_name, "raw", filename) | |
| batch_bucket_files(BUCKET_ID, add=[(data, key)]) | |
| return f"https://huggingface.co/buckets/{BUCKET_ID}/resolve/{key}" | |
| # ================================================================ | |
| # JSON | |
| # ================================================================ | |
| def load_json(path): | |
| """Load JSON from bucket key. Returns [] on miss (same behaviour as before).""" | |
| result = _read_bucket_json(path) | |
| return result if result is not None else [] | |
| def save_json(path, data): | |
| """Save data as JSON to bucket key.""" | |
| _write_bucket_json(path, data) | |
| # ================================================================ | |
| # USER FOLDERS / CAMERAS | |
| # ================================================================ | |
| def user_exists(user_id: str) -> bool: | |
| return _key_exists(get_user_file(user_id)) | |
| def load_cameras(user_id: str) -> list: | |
| path = get_user_file(user_id) | |
| try: | |
| data = _read_bucket_json(path) | |
| return data if isinstance(data, list) else [] | |
| except Exception: | |
| return [] | |
| def save_cameras(user_id: str, cameras: list): | |
| # Bucket keys don't need folder creation — just write the file | |
| _write_bucket_json(get_user_file(user_id), cameras) | |
| # ================================================================ | |
| # DASHBOARD | |
| # ================================================================ | |
| def get_user_dashboard(user_id: str, camera_name: str = None) -> dict: | |
| """Return analytics for a user or a specific camera""" | |
| cameras_file = get_user_file(user_id) | |
| if not _key_exists(cameras_file): | |
| raise HTTPException(404, f"User {user_id} not found") | |
| try: | |
| cameras = _read_bucket_json(cameras_file) or [] | |
| except Exception: | |
| cameras = [] | |
| total_cameras = len(cameras) | |
| total_images = 0 | |
| total_detections = 0 | |
| buck_type_distribution = {} | |
| buck_doe_distribution = {"Buck": 0, "Doe": 0} | |
| heatmap = defaultdict(lambda: [0] * 24) # day -> 24 hours | |
| deer_per_day = Counter() | |
| bucks_per_day = Counter() | |
| does_per_day = Counter() | |
| hour_activity = [0] * 24 # 0-23 hours | |
| for cam in cameras: | |
| cam_name = cam["camera_name"] | |
| if camera_name and cam_name != camera_name: | |
| continue | |
| # Count images (replaces raw_folder.glob("*.*")) | |
| raw_folder = _bucket_key(user_id, cam_name, "raw") | |
| raw_files = _list_prefix(raw_folder) | |
| total_images += len(raw_files) | |
| # Count detections and distributions (replaces open(detections_file)) | |
| detections_file = _bucket_key(user_id, cam_name, f"{cam_name}_detections.json") | |
| if _key_exists(detections_file): | |
| try: | |
| dets = _read_bucket_json(detections_file) or [] | |
| for rec in dets: | |
| # --- Existing Buck/Doe counts --- | |
| for d in rec.get("detections", []): | |
| total_detections += 1 | |
| label = d.get("label", "") | |
| if "|" in label: | |
| parts = [p.strip() for p in label.split("|")] | |
| if len(parts) == 3: # Buck with type | |
| buck_doe_distribution["Buck"] += 1 | |
| buck_type_distribution[parts[2]] = buck_type_distribution.get(parts[2], 0) + 1 | |
| else: # Doe | |
| buck_doe_distribution["Doe"] += 1 | |
| # --- New analytics using datetime_original --- | |
| dt_str = rec.get("metadata", {}).get("exif", {}).get("datetime_original") | |
| if dt_str: | |
| dt = datetime.strptime(dt_str, "%Y:%m:%d %H:%M:%S") | |
| day = dt.date() | |
| hour = dt.hour | |
| # Heatmap count | |
| heatmap[day][hour] += len(rec.get("detections", [])) | |
| # Count deer, bucks, does per day | |
| for d in rec.get("detections", []): | |
| label = d.get("label", "") | |
| if "Deer" in label: | |
| deer_per_day[day] += 1 | |
| if "Buck" in label: | |
| bucks_per_day[day] += 1 | |
| if "Doe" in label: | |
| does_per_day[day] += 1 | |
| # Hourly aggregated activity | |
| hour_activity[hour] += len(rec.get("detections", [])) | |
| except Exception: | |
| continue | |
| # Average activity by hour (morning/night) | |
| morning_hours = range(6, 18) | |
| night_hours = list(range(0, 6)) + list(range(18, 24)) | |
| morning_activity = sum(hour_activity[h] for h in morning_hours) / len(morning_hours) | |
| night_activity = sum(hour_activity[h] for h in night_hours) / len(night_hours) | |
| return { | |
| "user_id": user_id, | |
| "selected_camera": camera_name, | |
| "total_cameras": total_cameras, | |
| "images_uploaded": total_images, | |
| "total_detections": total_detections, | |
| "buck_type_distribution": buck_type_distribution, | |
| "buck_doe_distribution": buck_doe_distribution, | |
| # --- New analytics --- | |
| "activity_heatmap": dict(heatmap), | |
| "deer_per_day": dict(deer_per_day), | |
| "bucks_per_day": dict(bucks_per_day), | |
| "does_per_day": dict(does_per_day), | |
| "average_activity": { | |
| "morning": round(morning_activity, 2), | |
| "night": round(night_activity, 2) | |
| } | |
| } |