DIS_IPL / app.py
Jay-Rajput's picture
Admin panel imprvmnts
47370ee
from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify
import sqlite3
import hashlib
import os
import json
import re
import logging
import shutil
import tempfile
import threading
from datetime import datetime, date, timedelta
from functools import wraps
from collections import defaultdict
from zoneinfo import ZoneInfo
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
_log = logging.getLogger(__name__)
app = Flask(__name__, template_folder=os.path.join(BASE_DIR, 'templates'))
app.secret_key = os.environ.get('SECRET_KEY', 'ipl-predictions-secret-change-in-prod')
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90)
APP_TIMEZONE = (os.environ.get('APP_TIMEZONE') or 'Asia/Kolkata').strip()
try:
APP_TZ = ZoneInfo(APP_TIMEZONE)
except Exception:
APP_TZ = ZoneInfo('Asia/Kolkata')
def get_data_dir() -> str:
"""Writable directory for SQLite. Optional DIS_DATA_DIR (e.g. /data) if you use HF paid persistent disk."""
d = (os.environ.get('DIS_DATA_DIR') or '').strip()
if not d:
return BASE_DIR
try:
os.makedirs(d, exist_ok=True)
except OSError as e:
_log.warning('DIS_DATA_DIR %r not usable (%s); using app directory', d, e)
return BASE_DIR
return d
def db_path() -> str:
return os.path.join(get_data_dir(), 'ipl_predictions.db')
def resolved_json_path(filename: str) -> str:
"""Prefer a copy under DIS_DATA_DIR (editable at runtime), else bundled file from the image."""
dd = get_data_dir()
if dd != BASE_DIR:
p = os.path.join(dd, filename)
if os.path.isfile(p):
return p
return os.path.join(BASE_DIR, filename)
def users_json_path() -> str:
return resolved_json_path('users.json')
def matches_json_path() -> str:
return resolved_json_path('matches.json')
def players_json_path() -> str:
return resolved_json_path('players.json')
def _hub_token() -> str:
return (os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') or '').strip()
def state_repo_id() -> str:
return (os.environ.get('DIS_STATE_REPO') or os.environ.get('DIS_HUB_DATASET') or '').strip()
def state_bucket_id() -> str:
"""Hub Storage Bucket id, e.g. Jay-Rajput/dis-ipl-state (see HF Buckets docs)."""
return (os.environ.get('DIS_STATE_BUCKET') or os.environ.get('DIS_HUB_BUCKET') or '').strip()
STATE_DB_REMOTE_NAME = 'ipl_predictions.db'
def hub_remote_storage_configured() -> bool:
return bool(_hub_token() and (state_bucket_id() or state_repo_id()))
def _local_db_snapshot_looks_valid(path: str) -> bool:
return os.path.isfile(path) and os.path.getsize(path) > 4096
_hub_push_lock = threading.Lock()
def snapshot_sqlite_to_file(dest_path: str) -> bool:
"""Hot snapshot of the live DB (works with WAL)."""
src_p = db_path()
if not os.path.isfile(src_p):
return False
os.makedirs(os.path.dirname(dest_path) or '.', exist_ok=True)
tmp = dest_path + '.part'
try:
src = sqlite3.connect(src_p)
try:
dst = sqlite3.connect(tmp)
try:
src.backup(dst)
finally:
dst.close()
finally:
src.close()
os.replace(tmp, dest_path)
return True
except sqlite3.Error as e:
_log.warning('SQLite snapshot failed: %s', e)
try:
os.remove(tmp)
except OSError:
pass
return False
def push_state_to_hub() -> None:
"""Upload full SQLite snapshot (every user’s predictions, points, matches) to bucket and/or dataset."""
tok = _hub_token()
bucket = state_bucket_id()
repo = state_repo_id()
if not tok or (not bucket and not repo):
return
with _hub_push_lock:
fd, tmp = tempfile.mkstemp(suffix='.db')
os.close(fd)
try:
if not snapshot_sqlite_to_file(tmp):
return
if bucket:
try:
from huggingface_hub import batch_bucket_files
batch_bucket_files(
bucket,
add=[(tmp, STATE_DB_REMOTE_NAME)],
token=tok,
)
_log.info('Pushed state to Hub bucket %s', bucket)
except ImportError:
_log.warning('huggingface_hub too old for Buckets; pip install huggingface_hub>=1.7')
except Exception as e:
_log.warning('Hub bucket upload failed: %s', e)
if repo:
try:
from huggingface_hub import HfApi
api = HfApi(token=tok)
api.create_repo(repo_id=repo, repo_type='dataset', private=True, exist_ok=True)
api.upload_file(
path_or_fileobj=tmp,
path_in_repo=STATE_DB_REMOTE_NAME,
repo_id=repo,
repo_type='dataset',
commit_message='DIS IPL: save app state',
)
_log.info('Pushed state to Hub dataset %s', repo)
except Exception as e:
_log.warning('Hub dataset upload failed: %s', e)
finally:
try:
os.remove(tmp)
except OSError:
pass
def schedule_hub_push() -> None:
if not hub_remote_storage_configured():
return
def _run():
push_state_to_hub()
threading.Thread(target=_run, daemon=True).start()
def restore_db_from_hub_if_needed() -> None:
"""If local DB is missing or empty, restore from bucket first, then dataset if still not valid."""
tok = _hub_token()
bucket = state_bucket_id()
repo = state_repo_id()
if not tok or (not bucket and not repo):
return
force = os.environ.get('DIS_FORCE_HUB_RESTORE', '').strip() in ('1', 'true', 'yes')
dest = db_path()
if os.path.isfile(dest) and os.path.getsize(dest) > 4096 and not force:
return
os.makedirs(os.path.dirname(dest) or '.', exist_ok=True)
if bucket:
try:
from huggingface_hub import download_bucket_files
download_bucket_files(
bucket_id=bucket,
files=[(STATE_DB_REMOTE_NAME, dest)],
token=tok,
raise_on_missing_files=False,
)
if _local_db_snapshot_looks_valid(dest):
_log.info('Restored database from Hub bucket %s', bucket)
except ImportError:
_log.warning('huggingface_hub too old for Buckets; pip install huggingface_hub>=1.7')
except Exception as e:
_log.info('Hub bucket restore skipped: %s', e)
if not _local_db_snapshot_looks_valid(dest) and repo:
try:
from huggingface_hub import hf_hub_download
tmp_dir = tempfile.mkdtemp()
try:
p = hf_hub_download(
repo_id=repo,
filename=STATE_DB_REMOTE_NAME,
repo_type='dataset',
token=tok,
local_dir=tmp_dir,
)
shutil.copy2(p, dest)
if _local_db_snapshot_looks_valid(dest):
_log.info('Restored database from Hub dataset %s', repo)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception as e:
_log.info('Hub dataset restore skipped (fresh deploy or empty repo): %s', e)
NO_PASSWORD_PLACEHOLDER = hashlib.sha256(b'__internal_no_login__').hexdigest()
def admin_password() -> str:
"""Plain admin password from env. ADMIN_PASSWORD preferred; ADMIN_SECRET kept for older deploys."""
return (os.environ.get('ADMIN_PASSWORD') or os.environ.get('ADMIN_SECRET', '')).strip()
# ─── TEAM DATA ────────────────────────────────────────────────────────────────
IPL_TEAMS = [
'Mumbai Indians',
'Chennai Super Kings',
'Royal Challengers Bengaluru',
'Kolkata Knight Riders',
'Sunrisers Hyderabad',
'Delhi Capitals',
'Rajasthan Royals',
'Punjab Kings',
'Lucknow Super Giants',
'Gujarat Titans',
]
TEAM_ABBR = {
'Mumbai Indians': 'MI',
'Chennai Super Kings': 'CSK',
'Royal Challengers Bengaluru': 'RCB',
'Kolkata Knight Riders': 'KKR',
'Sunrisers Hyderabad': 'SRH',
'Delhi Capitals': 'DC',
'Rajasthan Royals': 'RR',
'Punjab Kings': 'PBKS',
'Lucknow Super Giants': 'LSG',
'Gujarat Titans': 'GT',
}
ABBR_TO_FULL = {v: k for k, v in TEAM_ABBR.items()}
TEAM_COLORS = {
'MI': '#004BA0', 'CSK': '#FFCC00', 'RCB': '#EC1C24',
'KKR': '#7C3AED', 'SRH': '#FF822A', 'DC': '#0078BC',
'RR': '#EA1A85', 'PBKS': '#AA4545', 'LSG': '#A4C639', 'GT': '#00B5E2',
}
POINTS_CONFIG = {
'initial': 1000,
'min_bid': 10,
'max_bid': 500,
'correct_winner': 1.0,
'wrong_winner': -1.0,
'correct_motm': 75,
'wrong_motm': -25,
'no_motm_predicted': 0,
'lock_minutes_before': 0, # 0 = lock at scheduled match start (not before)
}
MATCH_STATUSES = ['upcoming', 'locked', 'live', 'completed', 'abandoned', 'postponed']
_players_cache = None
_app_db_ready = False
_hub_restore_attempted = False
# ─── DATABASE ─────────────────────────────────────────────────────────────────
def get_db():
conn = sqlite3.connect(db_path())
conn.row_factory = sqlite3.Row
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA foreign_keys=ON')
return conn
def _table_columns(conn, table):
return {r[1] for r in conn.execute(f'PRAGMA table_info({table})').fetchall()}
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
display_name TEXT,
password_hash TEXT NOT NULL,
is_admin INTEGER DEFAULT 0,
points REAL DEFAULT 1000,
is_active INTEGER DEFAULT 1,
member_key TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS matches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
match_number INTEGER,
team1 TEXT NOT NULL,
team2 TEXT NOT NULL,
match_date DATE NOT NULL,
match_time TEXT NOT NULL,
lock_time TEXT,
venue TEXT,
city TEXT,
status TEXT DEFAULT 'upcoming',
winner TEXT,
man_of_match TEXT,
result_notes TEXT,
is_result_final INTEGER DEFAULT 0,
source_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
match_id INTEGER NOT NULL,
predicted_winner TEXT NOT NULL,
predicted_motm TEXT,
bid_amount REAL NOT NULL,
is_settled INTEGER DEFAULT 0,
winner_correct INTEGER,
motm_correct INTEGER,
points_earned REAL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, match_id),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (match_id) REFERENCES matches(id)
)''')
c.execute('''CREATE TABLE IF NOT EXISTS points_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
match_id INTEGER,
change_amount REAL NOT NULL,
reason TEXT,
balance_after REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (match_id) REFERENCES matches(id)
)''')
c.execute('''CREATE TABLE IF NOT EXISTS ip_bindings (
ip_hash TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
uc = _table_columns(conn, 'users')
if 'member_key' not in uc:
c.execute('ALTER TABLE users ADD COLUMN member_key TEXT')
mc = _table_columns(conn, 'matches')
if 'source_id' not in mc:
c.execute('ALTER TABLE matches ADD COLUMN source_id TEXT')
try:
c.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_matches_source_id ON matches(source_id) WHERE source_id IS NOT NULL')
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
# ─── JSON / SYNC ──────────────────────────────────────────────────────────────
def load_team_members():
path = users_json_path()
if not os.path.isfile(path):
return []
with open(path, encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict) and 'members' in data:
return list(data['members'])
if isinstance(data, dict):
out = []
for k in data.keys():
if k == 'members':
continue
slug = re.sub(r'[^a-z0-9]+', '_', str(k).lower()).strip('_')
out.append({'key': slug or str(k).lower(), 'display_name': str(k)})
return sorted(out, key=lambda x: x['display_name'].lower())
return []
def load_signin_roster():
"""Name picker list from DB so admin-added players appear without editing JSON."""
conn = get_db()
rows = conn.execute(
'''SELECT member_key, username, display_name FROM users WHERE is_active=1
ORDER BY LOWER(COALESCE(NULLIF(TRIM(display_name), ''), username))'''
).fetchall()
conn.close()
out = []
for r in rows:
key = (r['member_key'] or r['username'] or '').strip().lower()
if not key:
continue
label = (r['display_name'] or r['username'] or key).strip()
out.append({'key': key, 'display_name': label})
return out
def sync_users_from_json():
members = load_team_members()
if not members:
return
conn = get_db()
for m in members:
key = (m.get('key') or '').strip().lower()
display = (m.get('display_name') or key).strip()
if not key:
continue
row = conn.execute(
'SELECT id FROM users WHERE member_key=? OR username=?', (key, key)
).fetchone()
if row:
conn.execute(
'UPDATE users SET display_name=?, member_key=?, username=? WHERE id=?',
(display, key, key, row['id'])
)
else:
conn.execute(
'''INSERT INTO users (username, display_name, password_hash, member_key, is_admin, points)
VALUES (?,?,?,?,0,?)''',
(key, display, NO_PASSWORD_PLACEHOLDER, key, POINTS_CONFIG['initial'])
)
conn.commit()
conn.close()
def parse_time_24h(s: str) -> str:
s = (s or '').strip().upper().replace('AM', ' AM').replace('PM', ' PM')
s = re.sub(r'\s+', ' ', s)
for fmt in ('%I:%M %p', '%H:%M'):
try:
return datetime.strptime(s.strip(), fmt).strftime('%H:%M')
except ValueError:
continue
return '19:30'
def abbr_to_full_team(abbr: str) -> str:
a = (abbr or '').strip().upper()
return ABBR_TO_FULL.get(a, abbr)
def load_players_raw():
global _players_cache
if _players_cache is not None:
return _players_cache
path = players_json_path()
if not os.path.isfile(path):
_players_cache = {}
return _players_cache
with open(path, encoding='utf-8') as f:
_players_cache = json.load(f)
return _players_cache
def squad_for_match_teams(team1: str, team2: str):
ab1 = TEAM_ABBR.get(team1)
ab2 = TEAM_ABBR.get(team2)
raw = load_players_raw()
p1 = list(raw.get(ab1 or '', []))
p2 = list(raw.get(ab2 or '', []))
combined = sorted(set(p1 + p2), key=lambda n: n.lower())
return {'team1': p1, 'team2': p2, 'combined': combined, 'abbr1': ab1, 'abbr2': ab2}
def squad_for_predicted_winner(match, predicted_winner: str, squads: dict) -> list:
"""Squad list for whichever team the user picked to win."""
t1, t2 = match['team1'], match['team2']
if predicted_winner == t1:
return list(squads.get('team1') or [])
if predicted_winner == t2:
return list(squads.get('team2') or [])
return []
def validate_motm_for_winner(pred_motm: str, winner_squad: list):
"""
Returns (canonical_name, None) if valid, else (None, error_message).
canonical_name matches the roster spelling from winner_squad.
"""
if not winner_squad:
return None, 'No squad is listed for the team you picked. Ask your organiser to update the roster.'
raw = (pred_motm or '').strip()
if not raw:
return None, 'Please choose Man of the Match from your winning team’s squad.'
lower_map = {n.strip().lower(): n for n in winner_squad}
key = raw.lower()
if key in lower_map:
return lower_map[key], None
for n in winner_squad:
if n == raw:
return n, None
return None, 'Man of the Match must be a player from the team you picked to win.'
def sync_matches_from_json():
path = matches_json_path()
if not os.path.isfile(path):
return
with open(path, encoding='utf-8') as f:
rows = json.load(f)
conn = get_db()
for idx, m in enumerate(rows, start=1):
teams = m.get('teams') or []
if len(teams) < 2:
continue
source_id = str(m.get('match_id') or f"{m.get('date')}_{idx}")
team1 = abbr_to_full_team(teams[0])
team2 = abbr_to_full_team(teams[1])
d = m.get('date')
mt = parse_time_24h(m.get('time', '7:30 PM'))
venue = m.get('venue') or ''
city = m.get('city') or ''
exists = conn.execute('SELECT id FROM matches WHERE source_id=?', (source_id,)).fetchone()
if exists:
continue
conn.execute(
'''INSERT INTO matches
(match_number, team1, team2, match_date, match_time, venue, city, status, source_id)
VALUES (?,?,?,?,?,?,?,'upcoming',?)''',
(idx, team1, team2, d, mt, venue, city, source_id)
)
conn.commit()
conn.close()
# ─── HELPERS ─────────────────────────────────────────────────────────────────
def hash_password(pw: str) -> str:
return hashlib.sha256(pw.encode()).hexdigest()
def client_ip() -> str:
xff = request.headers.get('X-Forwarded-For') or request.headers.get('X-Real-IP')
if xff:
return xff.split(',')[0].strip()
return request.remote_addr or '0.0.0.0'
def ip_fingerprint() -> str:
return hashlib.sha256(f"{app.secret_key}|{client_ip()}".encode()).hexdigest()
def get_current_user():
uid = session.get('user_id')
if not uid:
return None
conn = get_db()
user = conn.execute('SELECT * FROM users WHERE id=? AND is_active=1', (uid,)).fetchone()
conn.close()
return user
def bind_ip_to_user(user_id: int):
conn = get_db()
conn.execute(
'''INSERT INTO ip_bindings (ip_hash, user_id, last_seen) VALUES (?,?,CURRENT_TIMESTAMP)
ON CONFLICT(ip_hash) DO UPDATE SET user_id=excluded.user_id, last_seen=CURRENT_TIMESTAMP''',
(ip_fingerprint(), user_id)
)
conn.commit()
conn.close()
def try_login_from_ip():
if session.get('user_id'):
return
h = ip_fingerprint()
conn = get_db()
row = conn.execute(
'''SELECT u.* FROM ip_bindings b JOIN users u ON u.id=b.user_id
WHERE b.ip_hash=? AND u.is_active=1''', (h,)
).fetchone()
conn.close()
if row:
session['user_id'] = row['id']
session.permanent = True
session.modified = True
def require_login(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not get_current_user():
# Allow admin-only flows with staff session even if no team user is selected.
if session.get('staff_ok') and request.endpoint and request.endpoint.startswith('admin'):
return f(*args, **kwargs)
flash('Pick your name to continue.', 'info')
return redirect(url_for('index'))
return f(*args, **kwargs)
return wrapper
def require_staff(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not admin_password():
flash('Admin login isn’t available yet. Ask whoever runs this app to turn it on.', 'warning')
return redirect(url_for('dashboard'))
if not session.get('staff_ok'):
return redirect(url_for('admin_login', next=request.path))
return f(*args, **kwargs)
return wrapper
def match_calendar_date(match) -> date:
md = match['match_date']
if isinstance(md, date):
return md
return date.fromisoformat(str(md))
def is_match_today(match) -> bool:
return match_calendar_date(match) == app_today()
def app_now() -> datetime:
return datetime.now(APP_TZ)
def app_today() -> date:
return app_now().date()
def match_start_dt(match) -> datetime:
match_dt_str = f"{match['match_date']} {match['match_time']}"
naive = datetime.strptime(match_dt_str, '%Y-%m-%d %H:%M')
return naive.replace(tzinfo=APP_TZ)
def is_prediction_locked(match) -> bool:
if match['status'] not in ('upcoming',):
return True
try:
match_dt = match_start_dt(match)
lock_dt = match_dt - timedelta(minutes=POINTS_CONFIG['lock_minutes_before'])
return app_now() >= lock_dt
except Exception:
return True
def predictions_allowed(match) -> bool:
"""True only on the scheduled calendar day, before lock at start time, status upcoming."""
if match['status'] != 'upcoming':
return False
if not is_match_today(match):
return False
return not is_prediction_locked(match)
def auto_lock_matches():
conn = get_db()
before = conn.total_changes
rows = conn.execute(
"SELECT id, match_date, match_time FROM matches WHERE status = 'upcoming'"
).fetchall()
for row in rows:
try:
match_dt = match_start_dt(row)
lock_dt = match_dt - timedelta(minutes=POINTS_CONFIG['lock_minutes_before'])
if app_now() >= lock_dt:
conn.execute(
"UPDATE matches SET status='locked', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(row['id'],)
)
except Exception:
pass
conn.commit()
if conn.total_changes > before:
schedule_hub_push()
conn.close()
def settle_match(match_id: int):
conn = get_db()
match = conn.execute('SELECT * FROM matches WHERE id=?', (match_id,)).fetchone()
if not match or not match['is_result_final']:
conn.close()
return False, 'Result not finalised.'
winner = (match['winner'] or '').strip().lower()
motm = (match['man_of_match'] or '').strip().lower()
predictions = conn.execute(
'SELECT * FROM predictions WHERE match_id=? AND is_settled=0', (match_id,)
).fetchall()
cfg = POINTS_CONFIG
for pred in predictions:
points_delta = 0.0
winner_correct = None
motm_correct = None
if match['status'] == 'abandoned':
points_delta = pred['bid_amount']
reason = f"Match #{match['match_number']} abandoned – bid refunded"
else:
pred_winner = (pred['predicted_winner'] or '').strip().lower()
pred_motm = (pred['predicted_motm'] or '').strip().lower()
if pred_winner == winner:
winner_correct = 1
points_delta += pred['bid_amount'] * cfg['correct_winner']
else:
winner_correct = 0
points_delta += pred['bid_amount'] * cfg['wrong_winner']
if pred_motm:
if _motm_match(pred_motm, motm):
motm_correct = 1
points_delta += cfg['correct_motm']
else:
motm_correct = 0
points_delta += cfg['wrong_motm']
reason = f"Match #{match['match_number']}: {match['team1']} vs {match['team2']}"
user = conn.execute('SELECT points FROM users WHERE id=?', (pred['user_id'],)).fetchone()
new_balance = round(user['points'] + points_delta, 2)
conn.execute('UPDATE users SET points=? WHERE id=?', (new_balance, pred['user_id']))
conn.execute(
'''UPDATE predictions SET
is_settled=1, winner_correct=?, motm_correct=?, points_earned=?, updated_at=CURRENT_TIMESTAMP
WHERE id=?''',
(winner_correct, motm_correct, points_delta, pred['id'])
)
conn.execute(
'''INSERT INTO points_history
(user_id, match_id, change_amount, reason, balance_after)
VALUES (?,?,?,?,?)''',
(pred['user_id'], match_id, points_delta, reason, new_balance)
)
conn.commit()
conn.close()
push_state_to_hub()
return True, f'Settled {len(predictions)} predictions.'
def _motm_match(pred: str, actual: str) -> bool:
if not pred or not actual:
return False
p = re.sub(r'[^a-z ]', '', pred.lower()).strip()
a = re.sub(r'[^a-z ]', '', actual.lower()).strip()
if p == a:
return True
p_parts = p.split()
a_parts = a.split()
if p_parts and a_parts and p_parts[-1] == a_parts[-1]:
return True
if p in a or a in p:
return True
return False
def unsettle_and_recalculate(match_id: int):
conn = get_db()
settled = conn.execute(
'SELECT * FROM predictions WHERE match_id=? AND is_settled=1', (match_id,)
).fetchall()
for pred in settled:
user = conn.execute('SELECT points FROM users WHERE id=?', (pred['user_id'],)).fetchone()
new_balance = round(user['points'] - pred['points_earned'], 2)
conn.execute('UPDATE users SET points=? WHERE id=?', (new_balance, pred['user_id']))
conn.execute('DELETE FROM points_history WHERE match_id=? AND user_id=?',
(match_id, pred['user_id']))
conn.execute(
'''UPDATE predictions SET
is_settled=0, winner_correct=NULL, motm_correct=NULL, points_earned=0
WHERE id=?''',
(pred['id'],)
)
conn.commit()
conn.close()
push_state_to_hub()
def get_todays_matches():
conn = get_db()
today = date.today().isoformat()
matches = conn.execute(
'SELECT * FROM matches WHERE match_date=? ORDER BY match_time', (today,)
).fetchall()
conn.close()
return matches
def enrich_match(match):
m = dict(match)
m['team1_abbr'] = TEAM_ABBR.get(m['team1'], m['team1'][:3].upper())
m['team2_abbr'] = TEAM_ABBR.get(m['team2'], m['team2'][:3].upper())
m['team1_color'] = TEAM_COLORS.get(m['team1_abbr'], '#555')
m['team2_color'] = TEAM_COLORS.get(m['team2_abbr'], '#555')
m['is_match_today'] = is_match_today(match)
m['can_predict'] = predictions_allowed(match)
m['locked'] = is_prediction_locked(match)
try:
match_dt = datetime.strptime(f"{m['match_date']} {m['match_time']}", '%Y-%m-%d %H:%M')
lock_dt = match_dt - timedelta(minutes=POINTS_CONFIG['lock_minutes_before'])
m['match_time_display'] = match_dt.strftime('%d %b %Y, %I:%M %p')
# When lock is at start (0 min before), same as kickoff — banner already shows match time
if POINTS_CONFIG['lock_minutes_before'] <= 0:
m['lock_time_display'] = ''
else:
m['lock_time_display'] = lock_dt.strftime('%d %b %Y, %I:%M %p')
except Exception:
m['lock_time_display'] = ''
m['match_time_display'] = m['match_time']
return m
def recent_completed_match_ids(conn, limit=5):
rows = conn.execute(
'''SELECT id FROM matches WHERE status='completed' AND is_result_final=1
ORDER BY match_date DESC, match_time DESC LIMIT ?''',
(limit,)
).fetchall()
return [r['id'] for r in rows]
def last_five_streak_cells(conn, user_id: int):
mids = recent_completed_match_ids(conn, 5)
if not mids:
return ['white'] * 5
qmarks = ','.join('?' * len(mids))
preds = conn.execute(
f'SELECT match_id, winner_correct, is_settled FROM predictions WHERE user_id=? AND match_id IN ({qmarks})',
(user_id, *mids)
).fetchall()
pmap = {p['match_id']: p for p in preds}
out = []
for mid in mids:
p = pmap.get(mid)
if not p:
out.append('white')
elif not p['is_settled']:
out.append('white')
elif p['winner_correct'] == 1:
out.append('green')
else:
out.append('red')
while len(out) < 5:
out.append('white')
return out[:5]
def user_best_streak(conn, user_id: int) -> int:
rows = conn.execute(
'''SELECT p.winner_correct, m.match_date, m.match_time
FROM predictions p
JOIN matches m ON m.id=p.match_id
WHERE p.user_id=? AND p.is_settled=1 AND m.status='completed' AND m.is_result_final=1
ORDER BY m.match_date ASC, m.match_time ASC''',
(user_id,)
).fetchall()
best = cur = 0
for r in rows:
if r['winner_correct'] == 1:
cur += 1
best = max(best, cur)
else:
cur = 0
return best
@app.before_request
def _before():
global _app_db_ready, _hub_restore_attempted
if request.endpoint and request.endpoint.startswith('static'):
return
if not _app_db_ready:
if not _hub_restore_attempted:
_hub_restore_attempted = True
restore_db_from_hub_if_needed()
init_db()
_app_db_ready = True
sync_users_from_json()
sync_matches_from_json()
auto_lock_matches()
try_login_from_ip()
# ─── IDENTITY (homepage: pick name, then use the app) ─────────────────────────
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
key = (request.form.get('member_key') or '').strip().lower()
remember = request.form.get('remember') == '1'
if not key:
flash('Choose your name from the list.', 'warning')
return redirect(url_for('index'))
conn = get_db()
user = conn.execute(
'SELECT * FROM users WHERE member_key=? OR username=?', (key, key)
).fetchone()
conn.close()
if not user or not user['is_active']:
flash('We don’t recognise that member. Ask your organiser to add you to the team list.', 'danger')
return redirect(url_for('index'))
session['user_id'] = user['id']
session.permanent = True
if remember:
bind_ip_to_user(user['id'])
schedule_hub_push()
flash(f"You're in as {user['display_name'] or user['username']}. Good luck! 🏏", 'success')
return redirect(url_for('dashboard'))
members = load_signin_roster()
if not members:
members = load_team_members()
suggested = None
h = ip_fingerprint()
conn = get_db()
row = conn.execute(
'''SELECT u.member_key, u.display_name FROM ip_bindings b
JOIN users u ON u.id=b.user_id WHERE b.ip_hash=? AND u.is_active=1''',
(h,)
).fetchone()
conn.close()
if row:
suggested = dict(row)
return render_template('identify.html', members=members, suggested=suggested)
@app.route('/identify')
def identify():
return redirect(url_for('index'))
@app.route('/logout')
def logout():
session.pop('user_id', None)
flash('Pick your name on the home page to continue.', 'info')
return redirect(url_for('index'))
# ─── ADMIN LOGIN (password) ───────────────────────────────────────────────────
@app.route('/admin-login', methods=['GET', 'POST'])
@app.route('/staff-login', methods=['GET', 'POST'])
def admin_login():
nxt = request.args.get('next') or url_for('admin')
pwd = admin_password()
if not pwd:
flash('Admin sign-in isn’t set up on this deployment.', 'danger')
return redirect(url_for('dashboard'))
if request.method == 'POST':
if request.form.get('password', '').strip() == pwd:
session['staff_ok'] = True
session.permanent = True
flash('Admin session started.', 'success')
return redirect(nxt)
flash('Wrong password.', 'danger')
return render_template('admin_login.html', next_url=nxt)
@app.route('/admin-logout')
@app.route('/staff-logout')
def admin_logout():
session.pop('staff_ok', None)
flash('Admin session ended.', 'info')
return redirect(url_for('dashboard'))
# ─── DASHBOARD ────────────────────────────────────────────────────────────────
@app.route('/dashboard')
@require_login
def dashboard():
user = get_current_user()
conn = get_db()
today = app_today().isoformat()
todays_matches = conn.execute(
'SELECT * FROM matches WHERE match_date=? ORDER BY match_time', (today,)
).fetchall()
todays_matches = [enrich_match(m) for m in todays_matches]
user_preds = {}
for m in todays_matches:
pred = conn.execute(
'SELECT * FROM predictions WHERE user_id=? AND match_id=?',
(user['id'], m['id'])
).fetchone()
user_preds[m['id']] = dict(pred) if pred else None
history = conn.execute(
'''SELECT ph.*, m.team1, m.team2, m.match_number
FROM points_history ph
LEFT JOIN matches m ON ph.match_id = m.id
WHERE ph.user_id=?
ORDER BY ph.created_at DESC LIMIT 10''',
(user['id'],)
).fetchall()
recent_predictions = conn.execute(
'''SELECT p.*, m.team1, m.team2, m.match_number, m.match_date,
m.match_time, m.winner AS match_winner, m.status AS match_status
FROM predictions p
JOIN matches m ON m.id = p.match_id
WHERE p.user_id=?
ORDER BY m.match_date DESC, m.match_time DESC
LIMIT 8''',
(user['id'],),
).fetchall()
leaders = conn.execute(
'''SELECT username, display_name, points FROM users
WHERE is_active=1 ORDER BY points DESC LIMIT 5'''
).fetchall()
rank_row = conn.execute(
'''SELECT COUNT(*)+1 as rank FROM users
WHERE points > ? AND is_active=1''',
(user['points'],)
).fetchone()
upcoming_other = conn.execute(
'''SELECT * FROM matches WHERE match_date > ? AND status IN ('upcoming','locked')
ORDER BY match_date ASC, match_time ASC LIMIT 3''',
(today,)
).fetchall()
upcoming_other = [enrich_match(m) for m in upcoming_other]
streak = user_best_streak(conn, user['id'])
last5 = last_five_streak_cells(conn, user['id'])
conn.close()
return render_template(
'dashboard.html',
user=user,
todays_matches=todays_matches,
user_preds=user_preds,
history=history,
recent_predictions=recent_predictions,
leaders=leaders,
rank=rank_row['rank'],
points_config=POINTS_CONFIG,
upcoming_other=upcoming_other,
my_streak=streak,
my_last5=last5,
)
# ─── MATCHES ──────────────────────────────────────────────────────────────────
@app.route('/matches', endpoint='matches')
@require_login
def matches_schedule():
user = get_current_user()
conn = get_db()
filter_status = request.args.get('status', '')
filter_date = request.args.get('date', '')
query = 'SELECT * FROM matches WHERE 1=1'
params = []
if filter_status:
query += ' AND status=?'
params.append(filter_status)
if filter_date:
query += ' AND match_date=?'
params.append(filter_date)
query += ' ORDER BY match_date ASC, match_time'
all_matches = conn.execute(query, params).fetchall()
all_matches = [enrich_match(m) for m in all_matches]
preds = conn.execute(
'''SELECT match_id, predicted_winner, predicted_motm, bid_amount, is_settled,
winner_correct, motm_correct, points_earned FROM predictions WHERE user_id=?''',
(user['id'],)
).fetchall()
pred_map = {p['match_id']: dict(p) for p in preds}
conn.close()
return render_template(
'matches.html',
user=user,
matches=all_matches,
pred_map=pred_map,
filter_status=filter_status,
filter_date=filter_date,
statuses=MATCH_STATUSES,
)
# ─── PREDICT ──────────────────────────────────────────────────────────────────
@app.route('/predict/<int:match_id>', methods=['GET', 'POST'])
@require_login
def predict(match_id):
user = get_current_user()
conn = get_db()
match = conn.execute('SELECT * FROM matches WHERE id=?', (match_id,)).fetchone()
if not match:
flash('Match not found.', 'danger')
conn.close()
return redirect(url_for('dashboard'))
match_e = enrich_match(match)
existing = conn.execute(
'SELECT * FROM predictions WHERE user_id=? AND match_id=?',
(user['id'], match_id)
).fetchone()
squads = squad_for_match_teams(match['team1'], match['team2'])
if request.method == 'POST':
if not predictions_allowed(match):
if match['status'] != 'upcoming':
flash('Predictions are not open for this match.', 'danger')
elif not is_match_today(match):
flash('Predictions open only on the scheduled match day.', 'warning')
else:
flash('Predictions are locked for this match.', 'danger')
conn.close()
return redirect(url_for('predict', match_id=match_id))
pred_winner = request.form.get('predicted_winner', '').strip()
pred_motm = request.form.get('predicted_motm', '').strip()
bid_str = request.form.get('bid_amount', '0').strip()
errors = []
canon_motm = ''
if pred_winner not in [match['team1'], match['team2']]:
errors.append('Please select a valid team as winner.')
else:
ws = squad_for_predicted_winner(match, pred_winner, squads)
ok_motm, motm_err = validate_motm_for_winner(pred_motm, ws)
if motm_err:
errors.append(motm_err)
else:
canon_motm = ok_motm
try:
bid = float(bid_str)
except ValueError:
bid = 0
errors.append('Invalid bid amount.')
cfg = POINTS_CONFIG
max_allowed = min(cfg['max_bid'], float(user['points'])) if user['points'] > cfg['min_bid'] else float(user['points'])
if user['points'] <= 0:
errors.append('You have no points left to bid.')
elif bid < cfg['min_bid']:
errors.append(f"Minimum bid is {cfg['min_bid']} points.")
elif bid > max_allowed:
errors.append(f"Maximum bid is {int(max_allowed)} points.")
if errors:
for e in errors:
flash(e, 'danger')
conn.close()
return render_template(
'predict.html',
user=user,
match=match_e,
existing=existing,
teams=IPL_TEAMS,
points_config=POINTS_CONFIG,
squads=squads,
)
if existing and existing['is_settled']:
flash('This prediction is already settled.', 'warning')
conn.close()
return redirect(url_for('dashboard'))
if existing:
conn.execute(
'''UPDATE predictions SET
predicted_winner=?, predicted_motm=?, bid_amount=?, updated_at=CURRENT_TIMESTAMP
WHERE id=?''',
(pred_winner, canon_motm, bid, existing['id'])
)
flash('Prediction updated! 🎯', 'success')
else:
conn.execute(
'''INSERT INTO predictions
(user_id, match_id, predicted_winner, predicted_motm, bid_amount)
VALUES (?,?,?,?,?)''',
(user['id'], match_id, pred_winner, canon_motm, bid)
)
flash('Prediction submitted! May the best team win! 🏆', 'success')
conn.commit()
conn.close()
push_state_to_hub()
return redirect(url_for('dashboard'))
conn.close()
return render_template(
'predict.html',
user=user,
match=match_e,
existing=existing,
teams=IPL_TEAMS,
points_config=POINTS_CONFIG,
squads=squads,
)
# ─── LEADERBOARD ──────────────────────────────────────────────────────────────
@app.route('/leaderboard')
@require_login
def leaderboard():
user = get_current_user()
conn = get_db()
base = conn.execute(
'''
SELECT u.id, u.username, u.display_name, u.points,
COUNT(p.id) AS total_predictions,
COALESCE(SUM(p.winner_correct), 0) AS correct_winners,
COALESCE(SUM(p.motm_correct), 0) AS correct_motms,
COALESCE(SUM(CASE WHEN p.is_settled=1 THEN 1 END), 0) AS settled_count,
COALESCE(SUM(p.points_earned), 0) AS total_earned
FROM users u
LEFT JOIN predictions p ON p.user_id = u.id
WHERE u.is_active=1
GROUP BY u.id
ORDER BY u.points DESC
'''
).fetchall()
players = []
for p in base:
d = dict(p)
d['last5'] = last_five_streak_cells(conn, p['id'])
d['best_streak'] = user_best_streak(conn, p['id'])
players.append(d)
recent_results = conn.execute(
'''SELECT * FROM matches WHERE status='completed' AND is_result_final=1
ORDER BY match_date DESC, match_time DESC LIMIT 5'''
).fetchall()
recent_results = [enrich_match(m) for m in recent_results]
conn.close()
return render_template(
'leaderboard.html',
user=user,
players=players,
initial_points=POINTS_CONFIG['initial'],
recent_results=recent_results,
)
# ─── TEAM POOL (everyone’s picks) ─────────────────────────────────────────────
@app.route('/pool')
@require_login
def team_pool():
"""All active members can see every prediction — transparency + banter."""
user = get_current_user()
conn = get_db()
match_pick = request.args.get('match_id', type=int)
member_pick = request.args.get('member_id', type=int)
pool_teammates = conn.execute(
'''SELECT DISTINCT u.id, u.username, u.display_name
FROM users u
JOIN predictions p ON p.user_id = u.id
WHERE u.is_active = 1
ORDER BY LOWER(COALESCE(u.display_name, u.username))'''
).fetchall()
teammate_ids = {r['id'] for r in pool_teammates}
if member_pick and member_pick not in teammate_ids:
member_pick = None
selector_rows = conn.execute(
'''SELECT m.*
FROM matches m
WHERE EXISTS (
SELECT 1 FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1
WHERE p.match_id = m.id
)
ORDER BY m.match_date DESC, m.match_time DESC'''
).fetchall()
selector_matches = [enrich_match(r) for r in selector_rows]
match_ids = [m['id'] for m in selector_matches]
if match_pick and match_pick not in match_ids:
match_pick = None
if match_pick:
match_ids = [match_pick]
pool_rows = []
for mid in match_ids:
m = conn.execute('SELECT * FROM matches WHERE id=?', (mid,)).fetchone()
if not m:
continue
preds = conn.execute(
'''SELECT p.*, u.username, u.display_name, u.id AS user_uid
FROM predictions p
JOIN users u ON u.id = p.user_id
WHERE p.match_id = ? AND u.is_active = 1
ORDER BY p.bid_amount DESC, LOWER(COALESCE(u.display_name, u.username))''',
(mid,),
).fetchall()
pred_list = [dict(x) for x in preds]
if member_pick:
pred_list = [p for p in pred_list if p.get('user_uid') == member_pick]
if not pred_list:
continue
me = enrich_match(m)
t1 = m['team1']
t2 = m['team2']
side1 = [p for p in pred_list if p.get('predicted_winner') == t1]
side2 = [p for p in pred_list if p.get('predicted_winner') == t2]
other = [p for p in pred_list if p not in side1 and p not in side2]
bid1 = sum(float(p['bid_amount'] or 0) for p in side1)
bid2 = sum(float(p['bid_amount'] or 0) for p in side2)
n = len(pred_list)
pct1 = round(100 * len(side1) / n) if n else 50
pool_rows.append(
{
'match': me,
'preds': pred_list,
'side1': side1,
'side2': side2,
'other': other,
'bid1': bid1,
'bid2': bid2,
'pct1': pct1,
}
)
totals = conn.execute(
'''SELECT COUNT(*) AS n,
COUNT(DISTINCT p.user_id) AS members,
COUNT(DISTINCT p.match_id) AS matches_touched
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1'''
).fetchone()
conn.close()
return render_template(
'team_pool.html',
user=user,
pool_rows=pool_rows,
selector_matches=selector_matches,
match_filter=match_pick,
pool_teammates=pool_teammates,
member_filter=member_pick,
totals=dict(totals) if totals else {'n': 0, 'members': 0, 'matches_touched': 0},
)
def _crowd_wisdom_from_conn(conn) -> dict:
"""Share of completed matches where the plurality crowd pick matched the result."""
rows = conn.execute(
'''SELECT p.match_id, m.winner, p.predicted_winner, COUNT(*) AS c
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1
JOIN matches m ON m.id = p.match_id
WHERE m.status = 'completed'
AND m.winner IS NOT NULL
AND TRIM(m.winner) != ''
GROUP BY p.match_id, p.predicted_winner'''
).fetchall()
by_match = defaultdict(lambda: {'winner': '', 'choices': []})
for r in rows:
mid = r['match_id']
by_match[mid]['winner'] = (r['winner'] or '').strip()
by_match[mid]['choices'].append({'pick': r['predicted_winner'], 'c': r['c']})
correct = 0
total = 0
for _mid, data in by_match.items():
ch = data['choices']
if not ch:
continue
max_c = max(x['c'] for x in ch)
tops = [(x['pick'] or '').strip().lower() for x in ch if x['c'] == max_c]
actual = (data['winner'] or '').strip().lower()
if not actual:
continue
total += 1
if any(t == actual for t in tops if t):
correct += 1
pct = round(100 * correct / total, 1) if total else None
return {'n': total, 'correct': correct, 'pct': pct}
# ─── ANALYTICS ────────────────────────────────────────────────────────────────
@app.route('/analytics')
@require_login
def analytics():
user = get_current_user()
conn = get_db()
pool = conn.execute(
'''SELECT
COUNT(DISTINCT u.id) AS members,
COUNT(p.id) AS preds,
SUM(CASE WHEN p.is_settled = 1 THEN 1 ELSE 0 END) AS settled,
SUM(CASE WHEN p.winner_correct = 1 THEN 1 ELSE 0 END) AS correct_w,
AVG(u.points) AS avg_pts,
COALESCE(SUM(p.bid_amount), 0) AS total_staked,
AVG(p.bid_amount) AS avg_bid,
SUM(CASE WHEN p.is_settled = 1 AND p.predicted_motm IS NOT NULL
AND TRIM(p.predicted_motm) != '' THEN 1 ELSE 0 END) AS motm_attempts,
SUM(CASE WHEN p.is_settled = 1 AND p.motm_correct = 1 THEN 1 ELSE 0 END) AS motm_hits,
SUM(CASE WHEN p.is_settled = 1 AND p.motm_correct = 0 THEN 1 ELSE 0 END) AS motm_misses
FROM users u
LEFT JOIN predictions p ON p.user_id = u.id
WHERE u.is_active = 1'''
).fetchone()
match_counts = conn.execute(
'''SELECT
COUNT(*) AS n_all,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) AS n_done,
SUM(CASE WHEN status IN ('upcoming', 'locked', 'live') THEN 1 ELSE 0 END) AS n_open
FROM matches'''
).fetchone()
matches_touched = conn.execute(
'''SELECT COUNT(DISTINCT p.match_id)
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1'''
).fetchone()[0] or 0
pick_rows = conn.execute(
'''SELECT p.predicted_winner, COUNT(*) AS c
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1
GROUP BY p.predicted_winner
ORDER BY c DESC
LIMIT 10'''
).fetchall()
pick_max = max((r['c'] for r in pick_rows), default=1)
motm_board = conn.execute(
'''SELECT u.display_name, u.username,
SUM(CASE WHEN p.motm_correct = 1 THEN 1 ELSE 0 END) AS hits,
SUM(CASE WHEN p.is_settled = 1 AND p.predicted_motm IS NOT NULL
AND TRIM(p.predicted_motm) != '' THEN 1 ELSE 0 END) AS attempts
FROM users u
JOIN predictions p ON p.user_id = u.id
WHERE u.is_active = 1 AND p.is_settled = 1
GROUP BY u.id
HAVING attempts > 0
ORDER BY hits DESC,
(CAST(hits AS REAL) / attempts) DESC,
attempts DESC
LIMIT 10'''
).fetchall()
sharp_picks = conn.execute(
'''SELECT u.display_name, u.username,
SUM(CASE WHEN p.is_settled = 1 THEN 1 ELSE 0 END) AS g,
SUM(CASE WHEN p.winner_correct = 1 THEN 1 ELSE 0 END) AS w
FROM users u
JOIN predictions p ON p.user_id = u.id
WHERE u.is_active = 1
GROUP BY u.id
HAVING g >= 2
ORDER BY (CAST(w AS REAL) / g) DESC, w DESC, g DESC
LIMIT 8'''
).fetchall()
pl_board = conn.execute(
'''SELECT u.display_name, u.username, u.points,
COALESCE(SUM(p.points_earned), 0) AS season_pl,
COUNT(p.id) AS pred_n
FROM users u
JOIN predictions p ON p.user_id = u.id
WHERE u.is_active = 1 AND p.is_settled = 1
GROUP BY u.id
ORDER BY season_pl DESC
LIMIT 8'''
).fetchall()
bid_bins = conn.execute(
'''SELECT
SUM(CASE WHEN p.bid_amount <= 50 THEN 1 ELSE 0 END) AS b_low,
SUM(CASE WHEN p.bid_amount > 50 AND p.bid_amount <= 150 THEN 1 ELSE 0 END) AS b_mid,
SUM(CASE WHEN p.bid_amount > 150 AND p.bid_amount <= 300 THEN 1 ELSE 0 END) AS b_high,
SUM(CASE WHEN p.bid_amount > 300 THEN 1 ELSE 0 END) AS b_whale
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1'''
).fetchone()
extrema = conn.execute(
'''SELECT MAX(p.points_earned) AS best_single,
MIN(p.points_earned) AS worst_single,
AVG(CASE WHEN p.is_settled = 1 THEN p.points_earned END) AS avg_settled_pl
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1'''
).fetchone()
sum_balances = conn.execute(
'SELECT COALESCE(SUM(points), 0) FROM users WHERE is_active = 1'
).fetchone()[0] or 0
settled_pl_sum = conn.execute(
'''SELECT COALESCE(SUM(p.points_earned), 0)
FROM predictions p
JOIN users u ON u.id = p.user_id AND u.is_active = 1
WHERE p.is_settled = 1'''
).fetchone()[0] or 0
crowd = _crowd_wisdom_from_conn(conn)
conn.close()
motm_acc = None
if pool['motm_attempts']:
motm_acc = round(100 * (pool['motm_hits'] or 0) / pool['motm_attempts'], 1)
win_acc = None
if pool['settled']:
win_acc = round(100 * (pool['correct_w'] or 0) / pool['settled'], 1)
bin_max = max(
bid_bins['b_low'] or 0,
bid_bins['b_mid'] or 0,
bid_bins['b_high'] or 0,
bid_bins['b_whale'] or 0,
1,
)
return render_template(
'analytics.html',
user=user,
pool=dict(pool) if pool else {},
match_counts=dict(match_counts) if match_counts else {},
matches_touched=matches_touched,
pick_rows=pick_rows,
pick_max=pick_max,
motm_board=motm_board,
sharp_picks=sharp_picks,
pl_board=pl_board,
bid_bins=dict(bid_bins) if bid_bins else {},
bin_max=bin_max,
extrema=dict(extrema) if extrema else {},
sum_balances=sum_balances,
settled_pl_sum=settled_pl_sum,
crowd=crowd,
motm_acc=motm_acc,
win_acc=win_acc,
)
# ─── HISTORY ──────────────────────────────────────────────────────────────────
@app.route('/history')
@require_login
def history():
user = get_current_user()
conn = get_db()
records = conn.execute(
'''SELECT p.*, m.team1, m.team2, m.match_number, m.match_date,
m.match_time, m.winner AS match_winner, m.man_of_match,
m.status AS match_status
FROM predictions p
JOIN matches m ON m.id = p.match_id
WHERE p.user_id=?
ORDER BY m.match_date DESC, m.match_time DESC''',
(user['id'],),
).fetchall()
ph = conn.execute(
'''SELECT ph.*, m.team1, m.team2, m.match_number
FROM points_history ph
LEFT JOIN matches m ON ph.match_id = m.id
WHERE ph.user_id=?
ORDER BY ph.created_at DESC''',
(user['id'],),
).fetchall()
conn.close()
return render_template('history.html', user=user, records=records, ph=ph, teams_abbr=TEAM_ABBR)
# ─── USER GUIDE ───────────────────────────────────────────────────────────────
@app.route('/guide')
@require_login
def user_guide():
return render_template('guide.html')
# ─── STAFF ADMIN ─────────────────────────────────────────────────────────────
@app.route('/admin')
@require_login
@require_staff
def admin():
user = get_current_user()
conn = get_db()
matches_all = conn.execute(
'SELECT * FROM matches ORDER BY match_date DESC, match_time'
).fetchall()
matches_all = [enrich_match(m) for m in matches_all]
users_all = conn.execute(
'SELECT * FROM users ORDER BY points DESC'
).fetchall()
pending_results = conn.execute(
"SELECT * FROM matches WHERE status='completed' AND is_result_final=0"
).fetchall()
unsettled = conn.execute(
'''SELECT m.*, COUNT(p.id) as pred_count
FROM matches m
LEFT JOIN predictions p ON p.match_id=m.id AND p.is_settled=0
WHERE m.status IN ('completed','abandoned') AND m.is_result_final=1 AND p.id IS NOT NULL
GROUP BY m.id'''
).fetchall()
squads_by_match_id = {}
for m in matches_all:
sq = squad_for_match_teams(m['team1'], m['team2'])
squads_by_match_id[m['id']] = {m['team1']: sq['team1'], m['team2']: sq['team2']}
conn.close()
return render_template(
'admin.html',
user=user,
matches=matches_all,
users=users_all,
teams=IPL_TEAMS,
pending_results=pending_results,
unsettled=unsettled,
points_config=POINTS_CONFIG,
statuses=MATCH_STATUSES,
squads_by_match_id=squads_by_match_id,
)
@app.route('/admin/match/add', methods=['POST'])
@require_login
@require_staff
def admin_add_match():
team1 = request.form.get('team1')
team2 = request.form.get('team2')
match_date = request.form.get('match_date')
match_time = request.form.get('match_time', '19:30')
venue = request.form.get('venue', '')
city = request.form.get('city', '')
match_number = request.form.get('match_number', '')
if not team1 or not team2 or not match_date:
flash('Team 1, Team 2 and Date are required.', 'danger')
return redirect(url_for('admin'))
if team1 == team2:
flash('Both teams cannot be the same.', 'danger')
return redirect(url_for('admin'))
conn = get_db()
dup = conn.execute(
'SELECT id FROM matches WHERE match_date=? AND match_time=? AND team1=? AND team2=?',
(match_date, match_time, team1, team2),
).fetchone()
if dup:
flash('This match already exists.', 'warning')
conn.close()
return redirect(url_for('admin'))
conn.execute(
'''INSERT INTO matches
(match_number, team1, team2, match_date, match_time, venue, city, status)
VALUES (?,?,?,?,?,?,?,'upcoming')''',
(match_number or None, team1, team2, match_date, match_time, venue, city),
)
conn.commit()
conn.close()
schedule_hub_push()
flash(f'Match added: {team1} vs {team2} on {match_date}', 'success')
return redirect(url_for('admin'))
@app.route('/admin/match/status/<int:match_id>', methods=['POST'])
@require_login
@require_staff
def admin_update_status(match_id):
new_status = request.form.get('status')
if new_status not in MATCH_STATUSES:
flash('Invalid status.', 'danger')
return redirect(url_for('admin'))
conn = get_db()
match = conn.execute('SELECT * FROM matches WHERE id=?', (match_id,)).fetchone()
if not match:
flash('Match not found.', 'danger')
conn.close()
return redirect(url_for('admin'))
conn.execute(
'UPDATE matches SET status=?, updated_at=CURRENT_TIMESTAMP WHERE id=?',
(new_status, match_id),
)
conn.commit()
conn.close()
schedule_hub_push()
flash(f'Match status updated to {new_status}.', 'success')
if new_status == 'abandoned':
conn2 = get_db()
conn2.execute(
"UPDATE matches SET is_result_final=1, winner='ABANDONED' WHERE id=?",
(match_id,),
)
conn2.commit()
conn2.close()
ok, msg = settle_match(match_id)
flash(f'Abandoned match settled: {msg}', 'info')
return redirect(url_for('admin'))
@app.route('/admin/match/result/<int:match_id>', methods=['POST'])
@require_login
@require_staff
def admin_set_result(match_id):
winner = request.form.get('winner', '').strip()
man_of_match = request.form.get('man_of_match', '').strip()
result_notes = request.form.get('result_notes', '').strip()
recalculate = request.form.get('recalculate') == '1'
conn = get_db()
match = conn.execute('SELECT * FROM matches WHERE id=?', (match_id,)).fetchone()
if not match:
flash('Match not found.', 'danger')
conn.close()
return redirect(url_for('admin'))
if winner not in [match['team1'], match['team2']]:
flash('Winner must be one of the playing teams.', 'danger')
conn.close()
return redirect(url_for('admin'))
squads = squad_for_match_teams(match['team1'], match['team2'])
if man_of_match:
ws = squad_for_predicted_winner(match, winner, squads)
ok_motm, motm_err = validate_motm_for_winner(man_of_match, ws)
if motm_err:
flash(motm_err, 'danger')
conn.close()
return redirect(url_for('admin'))
man_of_match = ok_motm or ''
if match['is_result_final'] and recalculate:
unsettle_and_recalculate(match_id)
flash('Previous settlement reversed. Recalculating…', 'info')
conn.execute(
'''UPDATE matches SET
winner=?, man_of_match=?, result_notes=?, status='completed',
is_result_final=1, updated_at=CURRENT_TIMESTAMP WHERE id=?''',
(winner, man_of_match, result_notes, match_id),
)
conn.commit()
conn.close()
ok, msg = settle_match(match_id)
if ok:
flash(f'Result saved & points settled! {msg}', 'success')
else:
flash(f'Result saved but settlement issue: {msg}', 'warning')
push_state_to_hub()
return redirect(url_for('admin'))
@app.route('/admin/match/delete/<int:match_id>', methods=['POST'])
@require_login
@require_staff
def admin_delete_match(match_id):
conn = get_db()
pred_count = conn.execute(
'SELECT COUNT(*) as cnt FROM predictions WHERE match_id=?', (match_id,)
).fetchone()['cnt']
if pred_count > 0:
flash(f'Cannot delete – {pred_count} predictions exist for this match.', 'danger')
conn.close()
return redirect(url_for('admin'))
conn.execute('DELETE FROM matches WHERE id=?', (match_id,))
conn.commit()
conn.close()
schedule_hub_push()
flash('Match deleted.', 'success')
return redirect(url_for('admin'))
@app.route('/admin/user/add', methods=['POST'])
@require_login
@require_staff
def admin_add_user():
username = request.form.get('username', '').strip()
display_name = request.form.get('display_name', '').strip()
password = request.form.get('password', '').strip()
initial_pts = float(request.form.get('initial_points', POINTS_CONFIG['initial']))
if not username:
flash('Username (member id) is required.', 'danger')
return redirect(url_for('admin'))
member_key = re.sub(r'[^a-z0-9]+', '_', username.lower()).strip('_') or username.lower()
if password:
pw_hash = hash_password(password)
else:
pw_hash = NO_PASSWORD_PLACEHOLDER
conn = get_db()
try:
conn.execute(
'''INSERT INTO users (username, display_name, password_hash, member_key, points)
VALUES (?,?,?,?,?)''',
(username, display_name or username, pw_hash, member_key, initial_pts),
)
conn.commit()
schedule_hub_push()
flash(f'User "{username}" added with {initial_pts} points.', 'success')
except sqlite3.IntegrityError:
flash(f'Username "{username}" already exists.', 'danger')
finally:
conn.close()
return redirect(url_for('admin'))
@app.route('/admin/user/adjust/<int:user_id>', methods=['POST'])
@require_login
@require_staff
def admin_adjust_points(user_id):
amount = request.form.get('amount', '0').strip()
reason = request.form.get('reason', 'Manual adjustment').strip()
try:
amount = float(amount)
except ValueError:
flash('Invalid amount.', 'danger')
return redirect(url_for('admin'))
conn = get_db()
u = conn.execute('SELECT * FROM users WHERE id=?', (user_id,)).fetchone()
if not u:
flash('User not found.', 'danger')
conn.close()
return redirect(url_for('admin'))
new_bal = round(u['points'] + amount, 2)
conn.execute('UPDATE users SET points=? WHERE id=?', (new_bal, user_id))
conn.execute(
'''INSERT INTO points_history (user_id, change_amount, reason, balance_after)
VALUES (?,?,?,?)''',
(user_id, amount, reason, new_bal),
)
conn.commit()
conn.close()
schedule_hub_push()
flash(f'Points adjusted by {amount:+.0f} for {u["username"]}. New balance: {new_bal:.0f}', 'success')
return redirect(url_for('admin'))
@app.route('/admin/user/toggle/<int:user_id>', methods=['POST'])
@require_login
@require_staff
def admin_toggle_user(user_id):
conn = get_db()
u = conn.execute('SELECT * FROM users WHERE id=?', (user_id,)).fetchone()
if not u:
conn.close()
return redirect(url_for('admin'))
new_active = 0 if u['is_active'] else 1
conn.execute('UPDATE users SET is_active=? WHERE id=?', (new_active, user_id))
conn.commit()
conn.close()
schedule_hub_push()
flash(f'User {"enabled" if new_active else "disabled"}.', 'info')
return redirect(url_for('admin'))
@app.route('/admin/match/predictions/<int:match_id>')
@require_login
@require_staff
def admin_match_predictions(match_id):
user = get_current_user()
conn = get_db()
match = conn.execute('SELECT * FROM matches WHERE id=?', (match_id,)).fetchone()
if not match:
flash('Match not found.', 'danger')
conn.close()
return redirect(url_for('admin'))
preds = conn.execute(
'''SELECT p.*, u.username, u.display_name
FROM predictions p
JOIN users u ON u.id = p.user_id
WHERE p.match_id=?
ORDER BY p.bid_amount DESC''',
(match_id,),
).fetchall()
conn.close()
return render_template(
'admin_predictions.html',
user=user,
match=enrich_match(match),
predictions=preds,
)
# ─── API ──────────────────────────────────────────────────────────────────────
@app.route('/api/todays-matches')
@require_login
def api_todays_matches():
matches = get_todays_matches()
result = []
for m in matches:
me = enrich_match(m)
me['match_date'] = str(me['match_date'])
result.append(me)
return jsonify(result)
@app.route('/api/user-stats')
@require_login
def api_user_stats():
user = get_current_user()
conn = get_db()
stats = conn.execute(
'''SELECT COUNT(*) as total,
SUM(winner_correct) as correct_w,
SUM(motm_correct) as correct_m,
SUM(CASE WHEN is_settled=1 THEN 1 END) as settled
FROM predictions WHERE user_id=?''',
(user['id'],),
).fetchone()
conn.close()
return jsonify(
{
'points': user['points'],
'total_predictions': stats['total'] or 0,
'correct_winners': stats['correct_w'] or 0,
'correct_motms': stats['correct_m'] or 0,
'settled': stats['settled'] or 0,
}
)
@app.route('/api/match-players/<int:match_id>')
@require_login
def api_match_players(match_id):
conn = get_db()
match = conn.execute('SELECT team1, team2 FROM matches WHERE id=?', (match_id,)).fetchone()
conn.close()
if not match:
return jsonify({'error': 'not found'}), 404
squads = squad_for_match_teams(match['team1'], match['team2'])
return jsonify(squads)
APP_BRAND = 'DIS IPL 2026'
APP_TAGLINE = 'Predict, compete, and win — where every pick counts!'
@app.context_processor
def inject_globals():
return {
'current_user': get_current_user(),
'team_abbr': TEAM_ABBR,
'team_colors': TEAM_COLORS,
'today': app_today().isoformat(),
'points_config': POINTS_CONFIG,
'staff_session': bool(session.get('staff_ok')),
'admin_login_configured': bool(admin_password()),
'app_brand': APP_BRAND,
'app_tagline': APP_TAGLINE,
}
@app.template_filter('delta_class')
def delta_class(val):
if val is None:
return 'text-muted'
return 'text-green' if float(val) >= 0 else 'text-red'
@app.template_filter('delta_sign')
def delta_sign(val):
if val is None:
return '—'
v = float(val)
return f'+{v:.0f}' if v >= 0 else f'{v:.0f}'
@app.template_filter('format_date')
def format_date_filter(d):
try:
return datetime.strptime(str(d), '%Y-%m-%d').strftime('%d %b')
except Exception:
return str(d)
@app.template_filter('format_date_weekday')
def format_date_weekday_filter(d):
"""e.g. Sun, 29 Mar — for dashboard greetings."""
try:
return datetime.strptime(str(d), '%Y-%m-%d').strftime('%a, %d %b')
except Exception:
return str(d)
@app.template_filter('initials')
def initials_filter(val):
if not val:
return '?'
parts = str(val).split()
if len(parts) >= 2:
return (parts[0][0] + parts[-1][0]).upper()
return (parts[0][:2]).upper()
def _live_reload_watch_paths():
"""Files that trigger a dev-server restart when changed (beyond imported .py)."""
paths = []
for sub in ('templates', 'static'):
root = os.path.join(BASE_DIR, sub)
if not os.path.isdir(root):
continue
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if fn.endswith(('.html', '.css', '.js', '.json', '.svg', '.ico', '.woff2')):
paths.append(os.path.join(dirpath, fn))
for fn in ('users.json', 'matches.json', 'players.json'):
p = os.path.join(BASE_DIR, fn)
if os.path.isfile(p):
paths.append(p)
return paths
if __name__ == '__main__':
init_db()
sync_users_from_json()
sync_matches_from_json()
# Local default 5000 (common for Flask). Hugging Face Spaces uses gunicorn, not this block.
port = int(os.environ.get('PORT', 5000))
# Live reload: on by default for `python app.py`. Set FLASK_DEBUG=0 (or false/no/off) to disable.
_live = os.environ.get('FLASK_DEBUG', '1').strip().lower() not in ('0', 'false', 'no', 'off')
print(f'\n {APP_BRAND} → http://127.0.0.1:{port}/')
if _live:
print(' Live reload: ON — save any .py, template, or static file to refresh.\n')
print(' Set FLASK_DEBUG=0 to turn off the reloader.\n')
else:
print(' Live reload: OFF (FLASK_DEBUG=0).\n')
app.run(
debug=_live,
use_reloader=_live,
extra_files=_live_reload_watch_paths() if _live else None,
host='0.0.0.0',
port=port,
)