From c850a07ff05f5dafe624206c84f37c5925ff804e Mon Sep 17 00:00:00 2001 From: marys Date: Fri, 26 Sep 2025 10:47:57 +0200 Subject: [PATCH] Mysql related functionality finished --- api/testAPI.py | 34 ++++++++++++++++++++- config.py | 6 ++++ internal_loop.py | 14 +++++++-- main.py | 19 ++++++++++-- models/__init__.py | 0 models/execution/__init__.py | 0 models/execution/model_execution.py | 47 +++++++++++++++++++++++++++++ mysql_related.py | 47 +++++++++++++++++++++++++++++ utils/__init__.py | 0 utils/db/__init__.py | 0 utils/db/db_execution.py | 22 ++++++++++++++ 11 files changed, 184 insertions(+), 5 deletions(-) create mode 100644 models/__init__.py create mode 100644 models/execution/__init__.py create mode 100644 models/execution/model_execution.py create mode 100644 mysql_related.py create mode 100644 utils/__init__.py create mode 100644 utils/db/__init__.py create mode 100644 utils/db/db_execution.py diff --git a/api/testAPI.py b/api/testAPI.py index 532e761..0027e51 100644 --- a/api/testAPI.py +++ b/api/testAPI.py @@ -3,8 +3,12 @@ from fastapi import ( status, Body, Depends) from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync -from influx_related import get_influxdb_client, get_influx_data, write_influx_data +from sqlalchemy.ext.asyncio import AsyncSession +from influx_related import get_influxdb_client, get_influx_data, write_influx_data +from models.execution.model_execution import ExecutionModel +from mysql_related import get_mysql_db +from utils.db.db_execution import ExecutionQueryAsync router = APIRouter() @@ -31,3 +35,31 @@ async def write_influx(temp: float, client: InfluxDBClientAsync = Depends(get_in await write_influx_data(client, temp) return {"status": "OK"} +@router.get( + path="/get_mysql/", + name='get data from mysql', + #response_model=InfluxDBClientAsync, + responses={ + status.HTTP_401_UNAUTHORIZED: {}, + } +) +async def get_mysql(db: AsyncSession = Depends(get_mysql_db)): + execution_query = ExecutionQueryAsync(db) + execution_model: list[ExecutionModel] = await execution_query.get_execution() + res = [i.asdict() for i in execution_model] + return {"results": res} + +@router.get( + path="/get_mysql_by_id/", + name='get data from mysql', + #response_model=InfluxDBClientAsync, + responses={ + status.HTTP_401_UNAUTHORIZED: {}, + } +) +async def get_mysql_by_id(id: int, db: AsyncSession = Depends(get_mysql_db)): + print(f"{db=}") + execution_query = ExecutionQueryAsync(db) + execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(id) + res = [i.asdict() for i in execution_model] + return {"results": res} \ No newline at end of file diff --git a/config.py b/config.py index 650abed..850c0e9 100644 --- a/config.py +++ b/config.py @@ -4,3 +4,9 @@ INFLUX_URL='http://localhost:8086' INFLUX_BUCKET='test_bucket' INFLUX_TOKEN='Os1nJwYP1TyPRF0iQhaft677Qh87uPPMoROFtmlh_8uXbc5q1xQDguCjtWZFj6nN69KJylXSupBVt6M5CQZueA==' INFLUX_ORG='my_org' + +MYSQL_DB_NAME='spirentservice' +MYSQL_HOST='localhost' +MYSQL_PASS='desneTajneHeslo' +MYSQL_PORT=3306 +MYSQL_USER='marys' diff --git a/internal_loop.py b/internal_loop.py index ef5351c..e1a5e5e 100644 --- a/internal_loop.py +++ b/internal_loop.py @@ -1,11 +1,14 @@ import asyncio from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync -from config import INFLUX_BUCKET, INFLUX_ORG +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + from influx_related import get_influx_data +from models.execution.model_execution import ExecutionModel +from utils.db.db_execution import ExecutionQueryAsync -async def internal_loop(influxdb_client: InfluxDBClientAsync): +async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: AsyncSession): i = 0 while True: print("internal_loop", i) @@ -16,4 +19,11 @@ async def internal_loop(influxdb_client: InfluxDBClientAsync): [print(" -", i) for i in records] except Exception as e: print(e) + + print(f"{type(mysql_client)=}") +# print(f"{type(mysql_client())=}") + execution_query = ExecutionQueryAsync(mysql_client) + execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(293) + res = [i.asdict() for i in execution_model] + print(res[0]['id']) await asyncio.sleep(5) diff --git a/main.py b/main.py index 2524822..94cc6e9 100644 --- a/main.py +++ b/main.py @@ -2,20 +2,35 @@ import asyncio from contextlib import asynccontextmanager import uvicorn -from fastapi import FastAPI +from fastapi import (FastAPI, Depends) + +from config import MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASS from internal_loop import internal_loop from api import testAPI from influx_related import init_influx +from mysql_related import mysql_init, get_mysql_db, get_session_local + @asynccontextmanager async def startup(fast_api: FastAPI): # Startup logic print("startup") influxdb_client = init_influx(fast_api) - asyncio.create_task(internal_loop(influxdb_client)) + mysql_init('mysql', + MYSQL_HOST, + MYSQL_PORT, + MYSQL_DB_NAME, + MYSQL_USER, + MYSQL_PASS + ) + mysql_client = get_session_local() + print("mysql init done") + asyncio.create_task(internal_loop(influxdb_client, mysql_client)) yield # Shutdown logic + await influxdb_client.close() + #await mysql_engine.dispose() app = FastAPI(lifespan=startup) diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/models/execution/__init__.py b/models/execution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/models/execution/model_execution.py b/models/execution/model_execution.py new file mode 100644 index 0000000..f46725a --- /dev/null +++ b/models/execution/model_execution.py @@ -0,0 +1,47 @@ +from sqlalchemy import Integer, String, DateTime, Column, JSON, Boolean +from sqlalchemy.orm import declarative_base + +base_execution = declarative_base() + + +class ExecutionModel(base_execution): + __tablename__ = 'execution' + + id = Column(Integer, primary_key=True, index=True) + started_at = Column(DateTime, nullable=True) + stopped_at = Column(DateTime, nullable=True) + execution_status = Column(String(50), nullable=True) + grafana_url = Column(String(1000), nullable=True) + squash_url = Column(String(1000), nullable=True) + test_name = Column(String(500), nullable=True) + hurricane_project_id = Column(String(100), nullable=True) + landslide_execution_id = Column(String(50), nullable=True) + hurricane_test_id = Column(String(100), nullable=True) + action_properties = Column(JSON, nullable=True) + parameters = Column(JSON, nullable=True) + parameters_update = Column(JSON, nullable=True) + queue_uuid = Column(String(36), nullable=True) + max_duration = Column(Integer, nullable=True) + str_id = Column(Integer, nullable=True) + + class Config: + orm_mode = True + + def asdict(self): + return {"id": self.id, + "started_at": self.started_at, + "stopped_at": self.stopped_at, + "execution_status": self.execution_status, + "grafana_url": self.grafana_url, + "squash_url": self.squash_url, + "test_name": self.test_name, + "hurricane_project_id": self.hurricane_project_id, + "landslide_execution_id": self.landslide_execution_id, + "hurricane_test_id": self.hurricane_test_id, + "action_properties": self.action_properties, + "parameters": self.parameters, + "parameters_update": self.parameters_update, + "queue_uuid": self.queue_uuid, + "max_duration": self.max_duration, + "str_id": self.str_id + } diff --git a/mysql_related.py b/mysql_related.py new file mode 100644 index 0000000..73e5890 --- /dev/null +++ b/mysql_related.py @@ -0,0 +1,47 @@ +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker + +mysql_engine = None +mysql_SessionLocal: async_sessionmaker + + +def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, password: str): + """Initialize DB engine + session factory""" + global mysql_engine, mysql_SessionLocal + + db_type = db_type + host = host + port = port + name = db_name + user = user + password = password + + protocol = { + 'mysql': 'mysql+aiomysql', + 'postgresql': 'postgresql+asyncpg', + } + + sqlalchemy_database_url = f"{protocol[db_type]}://{user}:{password}@{host}:{port}/{db_name}" + print(f"SQLAlchemy: {sqlalchemy_database_url}") + mysql_engine = create_async_engine( + sqlalchemy_database_url, + echo=False, + pool_size=5, + max_overflow=10 + ) + print(f"mysql_engine: {mysql_engine}") + mysql_SessionLocal = async_sessionmaker( + bind=mysql_engine, + expire_on_commit=False, + class_=AsyncSession, + autoflush=False + ) + print(f"mysql_SessionLocal: {mysql_SessionLocal}") + +def get_session_local() -> AsyncSession: + return mysql_SessionLocal() + + +async def get_mysql_db(): + """FastAPI dependency for providing DB sessions""" + async with get_session_local() as session: + yield session diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/db/__init__.py b/utils/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/db/db_execution.py b/utils/db/db_execution.py new file mode 100644 index 0000000..20bf03e --- /dev/null +++ b/utils/db/db_execution.py @@ -0,0 +1,22 @@ +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select + +from models.execution.model_execution import ExecutionModel + + +class ExecutionQueryAsync: + + def __init__(self, db: AsyncSession): + self.db = db + + async def get_execution(self): + result = await self.db.execute(select(ExecutionModel)) + rows = result.scalars().all() + return rows + + async def get_execution_by_id(self, id): + stmt = select(ExecutionModel).where(ExecutionModel.id == id) + result = await self.db.execute(stmt) + rows = result.scalars().all() + return rows + \ No newline at end of file