65 lines
2.6 KiB
Python
65 lines
2.6 KiB
Python
from utils.db.db import DbConnector
|
|
from models.model_mtr_network_nodes import MtrAdditionalNodesModel
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
class MtrAdditionalNodeQuery:
|
|
def __init__(self):
|
|
db_con = DbConnector("spirent_mysql")
|
|
self.db = db_con.get_db()
|
|
|
|
async def get_all_nodes(self) -> list:
|
|
"""
|
|
Retrieve all entries from the mtr_network_nodes database table.
|
|
"""
|
|
async with self.db() as session:
|
|
return (await session.execute(select(MtrAdditionalNodesModel))).scalars().all()
|
|
|
|
|
|
async def get_all_nodes_for_testing(self) -> list:
|
|
"""
|
|
Retrieve all entries from the mtr_network_nodes database table, exclude rows used for storing parameters for
|
|
testing TAS servers.
|
|
"""
|
|
|
|
async with (self.db() as session):
|
|
stmt = select(MtrAdditionalNodesModel) \
|
|
.where(MtrAdditionalNodesModel.test_servers == False) \
|
|
.where(MtrAdditionalNodesModel.enabled == True)
|
|
return (await session.execute(stmt)).scalars().all()
|
|
|
|
async def get_node_by_id(self, node_id: int) -> MtrAdditionalNodesModel:
|
|
async with self.db() as session:
|
|
stmt = select(MtrAdditionalNodesModel).where(MtrAdditionalNodesModel.id == node_id)
|
|
return (await session.execute(stmt)).scalars().first()
|
|
|
|
# ADD example from Perplexity
|
|
# async def insert_city(session: AsyncSession, name: str, population: int):
|
|
# new_city = City(name=name, population=population)
|
|
# session.add(new_city)
|
|
# await session.commit() # Commits the transaction
|
|
# await session.refresh(new_city) # Refreshes to get generated fields (like id)
|
|
# return new_city
|
|
|
|
# DELETE
|
|
# async def delete_city_by_id(session: AsyncSession, city_id: int):
|
|
# # First, fetch the city you want to delete
|
|
# result = await session.execute(select(City).where(City.id == city_id))
|
|
# city = result.scalar_one_or_none()
|
|
# if city:
|
|
# await session.delete(city)
|
|
# await session.commit()
|
|
# return True # Indicate success
|
|
# return False # Not found
|
|
|
|
# UPDATE
|
|
# async def update_city_population(session: AsyncSession, city_id: int, new_population: int):
|
|
# result = await session.execute(select(City).where(City.id == city_id))
|
|
# city = result.scalar_one_or_none()
|
|
# if city:
|
|
# city.population = new_population
|
|
# await session.commit()
|
|
# await session.refresh(city) # Optional: refresh to get updated values
|
|
# return city
|
|
# return None |