| """ |
| Day-ahead planner worker. |
| |
| Entry point for GitHub Actions cron (daily 05:00 IST = 02:00 UTC). |
| Usage: |
| python -m backend.workers.daily_planner |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import sys |
| from datetime import date, datetime, timezone |
| from pathlib import Path |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| |
| try: |
| from dotenv import load_dotenv |
| load_dotenv(PROJECT_ROOT / ".env") |
| except ImportError: |
| pass |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| ) |
| log = logging.getLogger("daily_planner") |
|
|
|
|
| def _get_forecast(target: date): |
| """Build 96-slot (15-min) forecast arrays from IMS cache or defaults.""" |
| try: |
| from src.data.ims_client import IMSClient |
| import pandas as pd |
|
|
| client = IMSClient() |
| df = client.load_cached() |
| if df.empty: |
| raise ValueError("No IMS cache available") |
|
|
| |
| df["hour"] = pd.to_datetime(df["timestamp_utc"]).dt.hour |
| |
| temps = [] |
| ghis = [] |
| for slot in range(96): |
| h = slot // 4 |
| |
| hour_data = df[df["hour"] == h] |
| t = hour_data["air_temperature_c"].mean() if "air_temperature_c" in df.columns and not hour_data.empty else 25.0 |
| g = hour_data["ghi_w_m2"].mean() if "ghi_w_m2" in df.columns and not hour_data.empty else 0.0 |
| temps.append(float(t) if not pd.isna(t) else 25.0) |
| ghis.append(float(g) if not pd.isna(g) else 0.0) |
| return temps, ghis |
| except Exception as exc: |
| log.warning("Could not build forecast from IMS: %s — using defaults", exc) |
| |
| import math |
| temps = [] |
| ghis = [] |
| for slot in range(96): |
| h = slot / 4 |
| |
| t = 26.5 + 8.5 * math.sin(math.pi * (h - 6) / 12) if 6 <= h <= 18 else 20.0 |
| |
| g = max(0, 900 * math.sin(math.pi * (h - 6) / 12)) if 6 <= h <= 18 else 0.0 |
| temps.append(round(t, 1)) |
| ghis.append(round(g, 1)) |
| return temps, ghis |
|
|
|
|
| def main(): |
| from src.day_ahead_planner import DayAheadPlanner |
| from src.data.redis_cache import get_redis |
| from config.settings import DAILY_PLAN_PATH, MAX_ENERGY_REDUCTION_PCT |
|
|
| from datetime import timedelta |
| |
| IST = timezone(timedelta(hours=2)) |
| target = datetime.now(IST).date() |
| log.info("Computing day-ahead plan for %s (IST)", target) |
|
|
| |
| forecast_temps, forecast_ghi = _get_forecast(target) |
|
|
| |
| daily_budget_kwh = 25.0 * MAX_ENERGY_REDUCTION_PCT / 100.0 |
|
|
| planner = DayAheadPlanner() |
| plan = planner.plan_day( |
| target_date=target, |
| forecast_temps=forecast_temps, |
| forecast_ghi=forecast_ghi, |
| daily_budget_kwh=daily_budget_kwh, |
| ) |
|
|
| plan_dict = plan.to_dict() if hasattr(plan, "to_dict") else {"raw": str(plan)} |
| plan_dict["_computed_at"] = datetime.now(timezone.utc).isoformat() |
|
|
| |
| try: |
| Path(DAILY_PLAN_PATH).parent.mkdir(parents=True, exist_ok=True) |
| with open(DAILY_PLAN_PATH, "w") as f: |
| json.dump(plan_dict, f, default=str, indent=2) |
| log.info("Plan saved to %s", DAILY_PLAN_PATH) |
| except Exception as exc: |
| log.error("Failed to save plan file: %s", exc) |
|
|
| |
| redis = get_redis() |
| if redis: |
| safe = json.loads(json.dumps(plan_dict, default=str)) |
| redis.set_json("control:plan", safe, ttl=86400) |
| log.info("Plan saved to Redis") |
| else: |
| log.warning("Redis not available — plan not shared") |
|
|
| log.info("Plan complete: %d slots", len(plan_dict.get("slots", []))) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|