api / backend /workers /daily_planner.py
safraeli's picture
Fix race conditions, error handling, timezone, divergence check
6d7f91e verified
"""
Day-ahead planner worker.
Entry point for GitHub Actions cron (daily 05:00 IST = 02:00 UTC).
Usage:
python -m backend.workers.daily_planner
"""
from __future__ import annotations
import json
import logging
import sys
from datetime import date, datetime, timezone
from pathlib import Path
# Ensure project root is on sys.path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
# Load .env if present (local dev)
try:
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env")
except ImportError:
pass
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("daily_planner")
def _get_forecast(target: date):
"""Build 96-slot (15-min) forecast arrays from IMS cache or defaults."""
try:
from src.data.ims_client import IMSClient
import pandas as pd
client = IMSClient()
df = client.load_cached()
if df.empty:
raise ValueError("No IMS cache available")
# Get the most recent day's pattern as a proxy for tomorrow
df["hour"] = pd.to_datetime(df["timestamp_utc"]).dt.hour
# Build 96 slots of temp and GHI
temps = []
ghis = []
for slot in range(96):
h = slot // 4
# Use mean values per hour from recent data
hour_data = df[df["hour"] == h]
t = hour_data["air_temperature_c"].mean() if "air_temperature_c" in df.columns and not hour_data.empty else 25.0
g = hour_data["ghi_w_m2"].mean() if "ghi_w_m2" in df.columns and not hour_data.empty else 0.0
temps.append(float(t) if not pd.isna(t) else 25.0)
ghis.append(float(g) if not pd.isna(g) else 0.0)
return temps, ghis
except Exception as exc:
log.warning("Could not build forecast from IMS: %s — using defaults", exc)
# Default: sinusoidal temperature and GHI profile for Sde Boker
import math
temps = []
ghis = []
for slot in range(96):
h = slot / 4 # fractional hour
# Temp: 18°C at night, peaks ~35°C at 14:00
t = 26.5 + 8.5 * math.sin(math.pi * (h - 6) / 12) if 6 <= h <= 18 else 20.0
# GHI: 0 at night, peaks ~900 W/m² at noon
g = max(0, 900 * math.sin(math.pi * (h - 6) / 12)) if 6 <= h <= 18 else 0.0
temps.append(round(t, 1))
ghis.append(round(g, 1))
return temps, ghis
def main():
from src.day_ahead_planner import DayAheadPlanner
from src.data.redis_cache import get_redis
from config.settings import DAILY_PLAN_PATH, MAX_ENERGY_REDUCTION_PCT
from datetime import timedelta
# Use Israel Standard Time (UTC+2) — HF Spaces runs in UTC
IST = timezone(timedelta(hours=2))
target = datetime.now(IST).date()
log.info("Computing day-ahead plan for %s (IST)", target)
# Build forecast inputs
forecast_temps, forecast_ghi = _get_forecast(target)
# Compute a reasonable daily budget (5% of ~25 kWh potential = ~1.25 kWh)
daily_budget_kwh = 25.0 * MAX_ENERGY_REDUCTION_PCT / 100.0
planner = DayAheadPlanner()
plan = planner.plan_day(
target_date=target,
forecast_temps=forecast_temps,
forecast_ghi=forecast_ghi,
daily_budget_kwh=daily_budget_kwh,
)
plan_dict = plan.to_dict() if hasattr(plan, "to_dict") else {"raw": str(plan)}
plan_dict["_computed_at"] = datetime.now(timezone.utc).isoformat()
# Save to file (backup)
try:
Path(DAILY_PLAN_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(DAILY_PLAN_PATH, "w") as f:
json.dump(plan_dict, f, default=str, indent=2)
log.info("Plan saved to %s", DAILY_PLAN_PATH)
except Exception as exc:
log.error("Failed to save plan file: %s", exc)
# Save to Redis
redis = get_redis()
if redis:
safe = json.loads(json.dumps(plan_dict, default=str))
redis.set_json("control:plan", safe, ttl=86400) # 24h TTL
log.info("Plan saved to Redis")
else:
log.warning("Redis not available — plan not shared")
log.info("Plan complete: %d slots", len(plan_dict.get("slots", [])))
if __name__ == "__main__":
main()