| import time
|
| from datetime import datetime, timedelta, timezone
|
| import pandas as pd
|
| import numpy as np
|
| from PyQt6.QtCore import QThread, pyqtSignal, QObject
|
|
|
| from src.core.mt5_interface import MT5Interface
|
| from src.core.market_profile import MarketProfile
|
|
|
| class DataWorker(QThread):
|
|
|
| data_signal = pyqtSignal(object, object)
|
| levels_signal = pyqtSignal(object, object, object, object)
|
| status_signal = pyqtSignal(str)
|
| finished_signal = pyqtSignal()
|
|
|
| def __init__(self, symbol, date_obj, multiplier=1.0):
|
| super().__init__()
|
| self.symbol = symbol
|
| self.date_obj = date_obj
|
| self.multiplier = multiplier
|
| self.running = True
|
| self.mt5_interface = MT5Interface()
|
| self.market_profile = MarketProfile(multiplier=self.multiplier)
|
|
|
| def run(self):
|
| self.status_signal.emit(f"Connecting to MT5... (Multiplier: {self.multiplier}x)")
|
| if not self.mt5_interface.initialize():
|
| self.status_signal.emit("Failed to connect to MT5.")
|
| self.finished_signal.emit()
|
| return
|
|
|
|
|
|
|
| target_date_utc = datetime(self.date_obj.year, self.date_obj.month, self.date_obj.day, tzinfo=timezone.utc)
|
|
|
|
|
| start_establishment = target_date_utc - timedelta(days=1) + timedelta(hours=22)
|
|
|
| end_establishment = target_date_utc
|
|
|
| end_session = target_date_utc + timedelta(days=1)
|
|
|
|
|
| now_utc = datetime.now(timezone.utc)
|
|
|
|
|
| is_historical = end_session < now_utc
|
| fetch_end = end_session if is_historical else now_utc
|
|
|
| self.status_signal.emit(f"Fetch Range: {start_establishment} to {fetch_end} ...")
|
|
|
|
|
| ticks_df = self.mt5_interface.get_ticks(self.symbol, start_establishment, fetch_end)
|
|
|
| if not ticks_df.empty:
|
|
|
| mask_est = (ticks_df['datetime'] >= start_establishment) & (ticks_df['datetime'] < end_establishment)
|
| df_est = ticks_df.loc[mask_est]
|
|
|
| mask_dev = (ticks_df['datetime'] >= end_establishment)
|
| df_dev = ticks_df.loc[mask_dev]
|
|
|
| self.status_signal.emit(f"Data: {len(ticks_df)} total. Est: {len(df_est)}, Dev: {len(df_dev)}")
|
|
|
|
|
| if not df_est.empty:
|
| self.market_profile.update(df_est)
|
| self.status_signal.emit(f"Profile Established. Ticks: {self.market_profile.total_ticks}")
|
| else:
|
| self.status_signal.emit("Warning: No Establishment Data (22:00-00:00). Starting empty.")
|
|
|
|
|
| dev_times = []
|
| dev_vah = []
|
| dev_val = []
|
| dev_poc = []
|
|
|
| if not df_dev.empty:
|
|
|
| df_dev_indexed = df_dev.set_index('datetime')
|
| grouped = df_dev_indexed.resample('1min')
|
|
|
| count_steps = 0
|
| for time_idx, group in grouped:
|
| if group.empty:
|
| continue
|
|
|
|
|
| group_reset = group.reset_index()
|
| self.market_profile.update(group_reset)
|
|
|
|
|
| v, l, p = self.market_profile.get_vah_val_poc()
|
| if v is not None:
|
|
|
| ts_float = time_idx.timestamp()
|
| dev_times.append(ts_float)
|
| dev_vah.append(v)
|
| dev_val.append(l)
|
| dev_poc.append(p)
|
| count_steps += 1
|
|
|
| self.status_signal.emit(f"Calculated {count_steps} developing points.")
|
|
|
|
|
|
|
| print(f"DEBUG: Worker Emitting Ticks: {len(ticks_df)}")
|
| self.data_signal.emit(ticks_df, self.market_profile.counts)
|
|
|
|
|
| if dev_times:
|
| print(f"DEBUG: Worker Emitting Levels: {len(dev_times)} pts. Times: {dev_times[0]} -> {dev_times[-1]}")
|
| self.levels_signal.emit(
|
| np.array(dev_times),
|
| np.array(dev_vah),
|
| np.array(dev_val),
|
| np.array(dev_poc)
|
| )
|
| else:
|
| print("DEBUG: No developing levels calculated to emit.")
|
| self.status_signal.emit("No developing levels calculated (insufficient dev data?).")
|
|
|
| else:
|
| self.status_signal.emit("No ticks returned from MT5.")
|
|
|
|
|
| if not is_historical:
|
| self.status_signal.emit("Live streaming active...")
|
|
|
| last_time = now_utc
|
| if not ticks_df.empty:
|
| last_time = ticks_df['datetime'].iloc[-1].to_pydatetime()
|
|
|
| while self.running:
|
| time.sleep(1.0)
|
|
|
| cur_time = datetime.now(timezone.utc)
|
| new_ticks = self.mt5_interface.get_ticks(self.symbol, last_time, cur_time + timedelta(seconds=1))
|
|
|
| if not new_ticks.empty:
|
| new_ticks = new_ticks[new_ticks['datetime'] > last_time]
|
|
|
| if not new_ticks.empty:
|
| self.market_profile.update(new_ticks)
|
| last_time = new_ticks['datetime'].iloc[-1].to_pydatetime()
|
|
|
|
|
|
|
| self.data_signal.emit(new_ticks, self.market_profile.counts)
|
|
|
|
|
| v, l, p = self.market_profile.get_vah_val_poc()
|
| if v is not None:
|
| ts_now = cur_time.timestamp()
|
|
|
| self.levels_signal.emit([ts_now], [v], [l], [p])
|
| else:
|
| self.status_signal.emit("Historical view loaded. Live stream inactive.")
|
|
|
|
|
| def stop(self):
|
| self.running = False
|
| self.mt5_interface.shutdown()
|
| self.wait()
|
|
|