From 521c474a171d2ecc55a6ae4adb791875da756a53 Mon Sep 17 00:00:00 2001 From: marys Date: Fri, 26 Sep 2025 11:34:12 +0200 Subject: [PATCH] Correct stop of loop, mysql and influx --- internal_loop.py | 16 ++++++++++++---- main.py | 32 ++++++++++++++++++++++---------- mysql_related.py | 15 +++++++++------ 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/internal_loop.py b/internal_loop.py index e1a5e5e..9a4f92e 100644 --- a/internal_loop.py +++ b/internal_loop.py @@ -1,17 +1,25 @@ import asyncio from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession from influx_related import get_influx_data from models.execution.model_execution import ExecutionModel from utils.db.db_execution import ExecutionQueryAsync +loop_stop_label: bool = True + +def internal_loop_stop(): + print("Stopping main Loop") + loop = asyncio.get_event_loop() + global loop_stop_label + loop_stop_label = False async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: AsyncSession): i = 0 - while True: - print("internal_loop", i) + global loop_stop_label + while loop_stop_label: + print("internal_loop", loop_stop_label, i) i += 1 try: records = await get_influx_data(influxdb_client) @@ -26,4 +34,4 @@ async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: Asyn execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(293) res = [i.asdict() for i in execution_model] print(res[0]['id']) - await asyncio.sleep(5) + await asyncio.sleep(2) diff --git a/main.py b/main.py index 94cc6e9..bf74e78 100644 --- a/main.py +++ b/main.py @@ -2,14 +2,14 @@ import asyncio from contextlib import asynccontextmanager import uvicorn -from fastapi import (FastAPI, Depends) +from fastapi import FastAPI from config import MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASS -from internal_loop import internal_loop +from internal_loop import internal_loop, internal_loop_stop from api import testAPI from influx_related import init_influx -from mysql_related import mysql_init, get_mysql_db, get_session_local +from mysql_related import mysql_init, get_mysql_session_local, get_mysql_engine @asynccontextmanager @@ -24,14 +24,26 @@ async def startup(fast_api: FastAPI): MYSQL_USER, MYSQL_PASS ) - mysql_client = get_session_local() + mysql_client = get_mysql_session_local() + mysql_engine = get_mysql_engine() + print(f"{mysql_engine=}") print("mysql init done") - asyncio.create_task(internal_loop(influxdb_client, mysql_client)) - yield - # Shutdown logic - await influxdb_client.close() - #await mysql_engine.dispose() - + task = asyncio.create_task(internal_loop(influxdb_client, mysql_client)) + try: + yield + finally: + # Shutdown logic + internal_loop_stop() + try: + await task + print("Cancelled") + except asyncio.CancelledError: + print("Cancelled exception") + pass + await influxdb_client.close() + mysql_engine = get_mysql_engine() + await mysql_client.close() + await mysql_engine.dispose() app = FastAPI(lifespan=startup) diff --git a/mysql_related.py b/mysql_related.py index 73e5890..90379ed 100644 --- a/mysql_related.py +++ b/mysql_related.py @@ -1,9 +1,10 @@ -from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.ext.asyncio import (create_async_engine, AsyncSession, + async_sessionmaker, AsyncEngine) -mysql_engine = None + +mysql_engine: AsyncEngine mysql_SessionLocal: async_sessionmaker - def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, password: str): """Initialize DB engine + session factory""" global mysql_engine, mysql_SessionLocal @@ -37,11 +38,13 @@ def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, pass ) print(f"mysql_SessionLocal: {mysql_SessionLocal}") -def get_session_local() -> AsyncSession: - return mysql_SessionLocal() +def get_mysql_engine() -> AsyncEngine: + return mysql_engine +def get_mysql_session_local() -> AsyncSession: + return mysql_SessionLocal() async def get_mysql_db(): """FastAPI dependency for providing DB sessions""" - async with get_session_local() as session: + async with get_mysql_session_local() as session: yield session