Chronos2AD_AF / app.py
Nuzz23's picture
robust handler
30e1cf5
import gradio as gr
import os
import torch
from chronos import Chronos2Pipeline
from utils import validateData, preProcessData, predictData, computeDiscreteScores, assembleResults, plotResults
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
OUT_PATH ="./savedPredictions/results.csv"
def dataProcessing(file, timestamp_column: str = None):
global chronos2
# Mostra messaggio di processing
yield (
gr.update(visible=True), # processing_msg
gr.update(visible=False, value=None), # plot_output
gr.update(visible=False, value=None), # download_output
gr.update(visible=False) # errorHandler
)
try:
if os.path.exists(OUT_PATH):
os.remove(OUT_PATH)
# Converti stringa vuota in None
if timestamp_column == "":
timestamp_column = None
validateData(file, timestamp_column)
preProcessedData, timestamp_old, target_cols = preProcessData(file, timestamp_column)
predictions, indexes = predictData(chronos2, preProcessedData, target_cols)
scores = computeDiscreteScores(predictions, preProcessedData, target_cols, indexes=indexes)
df = assembleResults(preProcessedData, timestamp_old, target_cols, scores)
fig = plotResults(df, target_cols)
df.to_csv(OUT_PATH, index=False)
# Nascondi processing, mostra risultati
if fig is not None:
yield (
gr.update(visible=False), # processing_msg
gr.update(visible=True, value=fig), # plot_output
gr.update(visible=True, value=OUT_PATH), # download_output
gr.update(visible=False) # errorHandler
)
else:
yield (
gr.update(visible=False), # processing_msg
gr.update(visible=False, value=None), # plot_output
gr.update(visible=True, value=OUT_PATH), # download_output
gr.update(visible=False) # errorHandler
)
except Exception as e:
# Mostra errore
yield (
gr.update(visible=False), # processing_msg
gr.update(visible=False, value=None), # plot_output
gr.update(visible=False, value=None), # download_output
gr.update(visible=True, value=f"❌ **Error:** {str(e)}") # errorHandler
)
if os.path.exists(OUT_PATH):
os.remove(OUT_PATH)
os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)
chronos2 = Chronos2Pipeline.from_pretrained("amazon/chronos-2", device_map=device)
with gr.Blocks(title="Time series anomaly detection with Chronos2") as demo:
gr.Markdown(
"""
# Time series anomaly detection with Chronos2
Welcome to the Chronos2 time series anomaly detection demo! This application allows you to upload your own time series data
and visualize the detected anomalies using the Chronos2 pipeline.
## Instructions
1. Click on the *Upload Time Series Data* button to upload your time series data in CSV format. The CSV file should have as columns only:
- **"timestamp":** the timestamp column of your data (e.g., "2023-01-01 00:00:00"). It is optional.
- **"values":** the columns containing the values of the time series data. They can be named as you wish. At least one column of values is required.
2. Answer the question about the timestamp in your data to help the model understand the temporal structure of your data.
- if present, you will need to specify the column name of the timestamp in your data.
- Otherwise, no need to do anything, just mark No.
3. Click on the *Detect Anomalies* button to run the Chronos2 pipeline and visualize the detected anomalies.
4. If the number of series is reasonably small, we will plot the original time series along with the detected anomalies.
5. We will provide a downloadable CSV file containing the original time series data along with an additional column indicating whether each point is an anomaly or not. We will label as 1 anomalies, as 0 normal points and as -1 the points for which we don't have a prediction because they are before the minimum length required by the model.
## Note
- The Chronos2 pipeline is designed to handle multivariate time series data, so you can upload datasets with multiple columns of values.
- For a correct prediction, a minimum length of 64 data points is required. If your time series is shorter than this, the pipeline will not be executed.
"""
)
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload Time Series Data (CSV)", file_types=[".csv"], file_count="single")
timestamp_question = gr.Radio(
label="Does your data contain a timestamp column?",
choices=["Yes", "No"],
value="No",
interactive=True,
)
timestamp_column_input = gr.Textbox(
label="Please specify the column name of the timestamp:",
visible=False,
value="",
interactive=False
)
timestamp_question.change(
lambda x: gr.update(
visible=(x == "Yes"),
interactive=(x == "Yes"),
value="timestamp" if x == "Yes" else ""
),
inputs=timestamp_question,
outputs=timestamp_column_input
)
detect_button = gr.Button("Detect Anomalies", variant="primary")
with gr.Column(scale=4):
processing_msg = gr.Markdown("⏳ Processing file, please wait...", visible=False)
errorHandler = gr.Markdown(visible=False)
plot_output = gr.Plot(label="Time Series with Detected Anomalies", visible=False)
download_output = gr.File(label="Download Anomaly Detection Results (CSV)", visible=False)
detect_button.click(
fn=dataProcessing,
inputs=[file_input, timestamp_column_input],
outputs=[processing_msg, plot_output, download_output, errorHandler]
)
demo.launch(share=True)