Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import torch | |
| from chronos import Chronos2Pipeline | |
| from utils import validateData, preProcessData, predictData, computeDiscreteScores, assembleResults, plotResults | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| OUT_PATH ="./savedPredictions/results.csv" | |
| def dataProcessing(file, timestamp_column: str = None): | |
| global chronos2 | |
| # Mostra messaggio di processing | |
| yield ( | |
| gr.update(visible=True), # processing_msg | |
| gr.update(visible=False, value=None), # plot_output | |
| gr.update(visible=False, value=None), # download_output | |
| gr.update(visible=False) # errorHandler | |
| ) | |
| try: | |
| if os.path.exists(OUT_PATH): | |
| os.remove(OUT_PATH) | |
| # Converti stringa vuota in None | |
| if timestamp_column == "": | |
| timestamp_column = None | |
| validateData(file, timestamp_column) | |
| preProcessedData, timestamp_old, target_cols = preProcessData(file, timestamp_column) | |
| predictions, indexes = predictData(chronos2, preProcessedData, target_cols) | |
| scores = computeDiscreteScores(predictions, preProcessedData, target_cols, indexes=indexes) | |
| df = assembleResults(preProcessedData, timestamp_old, target_cols, scores) | |
| fig = plotResults(df, target_cols) | |
| df.to_csv(OUT_PATH, index=False) | |
| # Nascondi processing, mostra risultati | |
| if fig is not None: | |
| yield ( | |
| gr.update(visible=False), # processing_msg | |
| gr.update(visible=True, value=fig), # plot_output | |
| gr.update(visible=True, value=OUT_PATH), # download_output | |
| gr.update(visible=False) # errorHandler | |
| ) | |
| else: | |
| yield ( | |
| gr.update(visible=False), # processing_msg | |
| gr.update(visible=False, value=None), # plot_output | |
| gr.update(visible=True, value=OUT_PATH), # download_output | |
| gr.update(visible=False) # errorHandler | |
| ) | |
| except Exception as e: | |
| # Mostra errore | |
| yield ( | |
| gr.update(visible=False), # processing_msg | |
| gr.update(visible=False, value=None), # plot_output | |
| gr.update(visible=False, value=None), # download_output | |
| gr.update(visible=True, value=f"❌ **Error:** {str(e)}") # errorHandler | |
| ) | |
| if os.path.exists(OUT_PATH): | |
| os.remove(OUT_PATH) | |
| os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True) | |
| chronos2 = Chronos2Pipeline.from_pretrained("amazon/chronos-2", device_map=device) | |
| with gr.Blocks(title="Time series anomaly detection with Chronos2") as demo: | |
| gr.Markdown( | |
| """ | |
| # Time series anomaly detection with Chronos2 | |
| Welcome to the Chronos2 time series anomaly detection demo! This application allows you to upload your own time series data | |
| and visualize the detected anomalies using the Chronos2 pipeline. | |
| ## Instructions | |
| 1. Click on the *Upload Time Series Data* button to upload your time series data in CSV format. The CSV file should have as columns only: | |
| - **"timestamp":** the timestamp column of your data (e.g., "2023-01-01 00:00:00"). It is optional. | |
| - **"values":** the columns containing the values of the time series data. They can be named as you wish. At least one column of values is required. | |
| 2. Answer the question about the timestamp in your data to help the model understand the temporal structure of your data. | |
| - if present, you will need to specify the column name of the timestamp in your data. | |
| - Otherwise, no need to do anything, just mark No. | |
| 3. Click on the *Detect Anomalies* button to run the Chronos2 pipeline and visualize the detected anomalies. | |
| 4. If the number of series is reasonably small, we will plot the original time series along with the detected anomalies. | |
| 5. We will provide a downloadable CSV file containing the original time series data along with an additional column indicating whether each point is an anomaly or not. We will label as 1 anomalies, as 0 normal points and as -1 the points for which we don't have a prediction because they are before the minimum length required by the model. | |
| ## Note | |
| - The Chronos2 pipeline is designed to handle multivariate time series data, so you can upload datasets with multiple columns of values. | |
| - For a correct prediction, a minimum length of 64 data points is required. If your time series is shorter than this, the pipeline will not be executed. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File(label="Upload Time Series Data (CSV)", file_types=[".csv"], file_count="single") | |
| timestamp_question = gr.Radio( | |
| label="Does your data contain a timestamp column?", | |
| choices=["Yes", "No"], | |
| value="No", | |
| interactive=True, | |
| ) | |
| timestamp_column_input = gr.Textbox( | |
| label="Please specify the column name of the timestamp:", | |
| visible=False, | |
| value="", | |
| interactive=False | |
| ) | |
| timestamp_question.change( | |
| lambda x: gr.update( | |
| visible=(x == "Yes"), | |
| interactive=(x == "Yes"), | |
| value="timestamp" if x == "Yes" else "" | |
| ), | |
| inputs=timestamp_question, | |
| outputs=timestamp_column_input | |
| ) | |
| detect_button = gr.Button("Detect Anomalies", variant="primary") | |
| with gr.Column(scale=4): | |
| processing_msg = gr.Markdown("⏳ Processing file, please wait...", visible=False) | |
| errorHandler = gr.Markdown(visible=False) | |
| plot_output = gr.Plot(label="Time Series with Detected Anomalies", visible=False) | |
| download_output = gr.File(label="Download Anomaly Detection Results (CSV)", visible=False) | |
| detect_button.click( | |
| fn=dataProcessing, | |
| inputs=[file_input, timestamp_column_input], | |
| outputs=[processing_msg, plot_output, download_output, errorHandler] | |
| ) | |
| demo.launch(share=True) |