Correct stop of loop, mysql and influx

This commit is contained in:
marys
2025-09-26 11:34:12 +02:00
parent c850a07ff0
commit 521c474a17
3 changed files with 43 additions and 20 deletions

View File

@@ -1,17 +1,25 @@
import asyncio import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import AsyncSession
from influx_related import get_influx_data from influx_related import get_influx_data
from models.execution.model_execution import ExecutionModel from models.execution.model_execution import ExecutionModel
from utils.db.db_execution import ExecutionQueryAsync from utils.db.db_execution import ExecutionQueryAsync
loop_stop_label: bool = True
def internal_loop_stop():
print("Stopping main Loop")
loop = asyncio.get_event_loop()
global loop_stop_label
loop_stop_label = False
async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: AsyncSession): async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: AsyncSession):
i = 0 i = 0
while True: global loop_stop_label
print("internal_loop", i) while loop_stop_label:
print("internal_loop", loop_stop_label, i)
i += 1 i += 1
try: try:
records = await get_influx_data(influxdb_client) records = await get_influx_data(influxdb_client)
@@ -26,4 +34,4 @@ async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: Asyn
execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(293) execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(293)
res = [i.asdict() for i in execution_model] res = [i.asdict() for i in execution_model]
print(res[0]['id']) print(res[0]['id'])
await asyncio.sleep(5) await asyncio.sleep(2)

26
main.py
View File

@@ -2,14 +2,14 @@ import asyncio
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import uvicorn import uvicorn
from fastapi import (FastAPI, Depends) from fastapi import FastAPI
from config import MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASS from config import MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASS
from internal_loop import internal_loop from internal_loop import internal_loop, internal_loop_stop
from api import testAPI from api import testAPI
from influx_related import init_influx from influx_related import init_influx
from mysql_related import mysql_init, get_mysql_db, get_session_local from mysql_related import mysql_init, get_mysql_session_local, get_mysql_engine
@asynccontextmanager @asynccontextmanager
@@ -24,14 +24,26 @@ async def startup(fast_api: FastAPI):
MYSQL_USER, MYSQL_USER,
MYSQL_PASS MYSQL_PASS
) )
mysql_client = get_session_local() mysql_client = get_mysql_session_local()
mysql_engine = get_mysql_engine()
print(f"{mysql_engine=}")
print("mysql init done") print("mysql init done")
asyncio.create_task(internal_loop(influxdb_client, mysql_client)) task = asyncio.create_task(internal_loop(influxdb_client, mysql_client))
try:
yield yield
finally:
# Shutdown logic # Shutdown logic
internal_loop_stop()
try:
await task
print("Cancelled")
except asyncio.CancelledError:
print("Cancelled exception")
pass
await influxdb_client.close() await influxdb_client.close()
#await mysql_engine.dispose() mysql_engine = get_mysql_engine()
await mysql_client.close()
await mysql_engine.dispose()
app = FastAPI(lifespan=startup) app = FastAPI(lifespan=startup)

View File

@@ -1,9 +1,10 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import (create_async_engine, AsyncSession,
async_sessionmaker, AsyncEngine)
mysql_engine = None
mysql_engine: AsyncEngine
mysql_SessionLocal: async_sessionmaker mysql_SessionLocal: async_sessionmaker
def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, password: str): def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, password: str):
"""Initialize DB engine + session factory""" """Initialize DB engine + session factory"""
global mysql_engine, mysql_SessionLocal global mysql_engine, mysql_SessionLocal
@@ -37,11 +38,13 @@ def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, pass
) )
print(f"mysql_SessionLocal: {mysql_SessionLocal}") print(f"mysql_SessionLocal: {mysql_SessionLocal}")
def get_session_local() -> AsyncSession: def get_mysql_engine() -> AsyncEngine:
return mysql_SessionLocal() return mysql_engine
def get_mysql_session_local() -> AsyncSession:
return mysql_SessionLocal()
async def get_mysql_db(): async def get_mysql_db():
"""FastAPI dependency for providing DB sessions""" """FastAPI dependency for providing DB sessions"""
async with get_session_local() as session: async with get_mysql_session_local() as session:
yield session yield session