Mysql related functionality finished

This commit is contained in:
marys
2025-09-26 10:47:57 +02:00
parent 795a494950
commit c850a07ff0
11 changed files with 184 additions and 5 deletions

View File

@@ -3,8 +3,12 @@ from fastapi import (
status,
Body, Depends)
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influx_related import get_influxdb_client, get_influx_data, write_influx_data
from sqlalchemy.ext.asyncio import AsyncSession
from influx_related import get_influxdb_client, get_influx_data, write_influx_data
from models.execution.model_execution import ExecutionModel
from mysql_related import get_mysql_db
from utils.db.db_execution import ExecutionQueryAsync
router = APIRouter()
@@ -31,3 +35,31 @@ async def write_influx(temp: float, client: InfluxDBClientAsync = Depends(get_in
await write_influx_data(client, temp)
return {"status": "OK"}
@router.get(
path="/get_mysql/",
name='get data from mysql',
#response_model=InfluxDBClientAsync,
responses={
status.HTTP_401_UNAUTHORIZED: {},
}
)
async def get_mysql(db: AsyncSession = Depends(get_mysql_db)):
execution_query = ExecutionQueryAsync(db)
execution_model: list[ExecutionModel] = await execution_query.get_execution()
res = [i.asdict() for i in execution_model]
return {"results": res}
@router.get(
path="/get_mysql_by_id/",
name='get data from mysql',
#response_model=InfluxDBClientAsync,
responses={
status.HTTP_401_UNAUTHORIZED: {},
}
)
async def get_mysql_by_id(id: int, db: AsyncSession = Depends(get_mysql_db)):
print(f"{db=}")
execution_query = ExecutionQueryAsync(db)
execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(id)
res = [i.asdict() for i in execution_model]
return {"results": res}

View File

@@ -4,3 +4,9 @@ INFLUX_URL='http://localhost:8086'
INFLUX_BUCKET='test_bucket'
INFLUX_TOKEN='Os1nJwYP1TyPRF0iQhaft677Qh87uPPMoROFtmlh_8uXbc5q1xQDguCjtWZFj6nN69KJylXSupBVt6M5CQZueA=='
INFLUX_ORG='my_org'
MYSQL_DB_NAME='spirentservice'
MYSQL_HOST='localhost'
MYSQL_PASS='desneTajneHeslo'
MYSQL_PORT=3306
MYSQL_USER='marys'

View File

@@ -1,11 +1,14 @@
import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from config import INFLUX_BUCKET, INFLUX_ORG
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from influx_related import get_influx_data
from models.execution.model_execution import ExecutionModel
from utils.db.db_execution import ExecutionQueryAsync
async def internal_loop(influxdb_client: InfluxDBClientAsync):
async def internal_loop(influxdb_client: InfluxDBClientAsync, mysql_client: AsyncSession):
i = 0
while True:
print("internal_loop", i)
@@ -16,4 +19,11 @@ async def internal_loop(influxdb_client: InfluxDBClientAsync):
[print(" -", i) for i in records]
except Exception as e:
print(e)
print(f"{type(mysql_client)=}")
# print(f"{type(mysql_client())=}")
execution_query = ExecutionQueryAsync(mysql_client)
execution_model: list[ExecutionModel] = await execution_query.get_execution_by_id(293)
res = [i.asdict() for i in execution_model]
print(res[0]['id'])
await asyncio.sleep(5)

19
main.py
View File

@@ -2,20 +2,35 @@ import asyncio
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi import (FastAPI, Depends)
from config import MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASS
from internal_loop import internal_loop
from api import testAPI
from influx_related import init_influx
from mysql_related import mysql_init, get_mysql_db, get_session_local
@asynccontextmanager
async def startup(fast_api: FastAPI):
# Startup logic
print("startup")
influxdb_client = init_influx(fast_api)
asyncio.create_task(internal_loop(influxdb_client))
mysql_init('mysql',
MYSQL_HOST,
MYSQL_PORT,
MYSQL_DB_NAME,
MYSQL_USER,
MYSQL_PASS
)
mysql_client = get_session_local()
print("mysql init done")
asyncio.create_task(internal_loop(influxdb_client, mysql_client))
yield
# Shutdown logic
await influxdb_client.close()
#await mysql_engine.dispose()
app = FastAPI(lifespan=startup)

0
models/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,47 @@
from sqlalchemy import Integer, String, DateTime, Column, JSON, Boolean
from sqlalchemy.orm import declarative_base
base_execution = declarative_base()
class ExecutionModel(base_execution):
__tablename__ = 'execution'
id = Column(Integer, primary_key=True, index=True)
started_at = Column(DateTime, nullable=True)
stopped_at = Column(DateTime, nullable=True)
execution_status = Column(String(50), nullable=True)
grafana_url = Column(String(1000), nullable=True)
squash_url = Column(String(1000), nullable=True)
test_name = Column(String(500), nullable=True)
hurricane_project_id = Column(String(100), nullable=True)
landslide_execution_id = Column(String(50), nullable=True)
hurricane_test_id = Column(String(100), nullable=True)
action_properties = Column(JSON, nullable=True)
parameters = Column(JSON, nullable=True)
parameters_update = Column(JSON, nullable=True)
queue_uuid = Column(String(36), nullable=True)
max_duration = Column(Integer, nullable=True)
str_id = Column(Integer, nullable=True)
class Config:
orm_mode = True
def asdict(self):
return {"id": self.id,
"started_at": self.started_at,
"stopped_at": self.stopped_at,
"execution_status": self.execution_status,
"grafana_url": self.grafana_url,
"squash_url": self.squash_url,
"test_name": self.test_name,
"hurricane_project_id": self.hurricane_project_id,
"landslide_execution_id": self.landslide_execution_id,
"hurricane_test_id": self.hurricane_test_id,
"action_properties": self.action_properties,
"parameters": self.parameters,
"parameters_update": self.parameters_update,
"queue_uuid": self.queue_uuid,
"max_duration": self.max_duration,
"str_id": self.str_id
}

47
mysql_related.py Normal file
View File

@@ -0,0 +1,47 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
mysql_engine = None
mysql_SessionLocal: async_sessionmaker
def mysql_init(db_type: str, host: str, port: int, db_name: str, user: str, password: str):
"""Initialize DB engine + session factory"""
global mysql_engine, mysql_SessionLocal
db_type = db_type
host = host
port = port
name = db_name
user = user
password = password
protocol = {
'mysql': 'mysql+aiomysql',
'postgresql': 'postgresql+asyncpg',
}
sqlalchemy_database_url = f"{protocol[db_type]}://{user}:{password}@{host}:{port}/{db_name}"
print(f"SQLAlchemy: {sqlalchemy_database_url}")
mysql_engine = create_async_engine(
sqlalchemy_database_url,
echo=False,
pool_size=5,
max_overflow=10
)
print(f"mysql_engine: {mysql_engine}")
mysql_SessionLocal = async_sessionmaker(
bind=mysql_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False
)
print(f"mysql_SessionLocal: {mysql_SessionLocal}")
def get_session_local() -> AsyncSession:
return mysql_SessionLocal()
async def get_mysql_db():
"""FastAPI dependency for providing DB sessions"""
async with get_session_local() as session:
yield session

0
utils/__init__.py Normal file
View File

0
utils/db/__init__.py Normal file
View File

22
utils/db/db_execution.py Normal file
View File

@@ -0,0 +1,22 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from models.execution.model_execution import ExecutionModel
class ExecutionQueryAsync:
def __init__(self, db: AsyncSession):
self.db = db
async def get_execution(self):
result = await self.db.execute(select(ExecutionModel))
rows = result.scalars().all()
return rows
async def get_execution_by_id(self, id):
stmt = select(ExecutionModel).where(ExecutionModel.id == id)
result = await self.db.execute(stmt)
rows = result.scalars().all()
return rows