commit 795a4949505358a81b0e38c4b60e58ec8e5e9d9c Author: marys Date: Thu Sep 25 10:40:29 2025 +0200 Influx related functionality finished diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/testAPI.py b/api/testAPI.py new file mode 100644 index 0000000..532e761 --- /dev/null +++ b/api/testAPI.py @@ -0,0 +1,33 @@ +from fastapi import ( + APIRouter, + status, + Body, Depends) +from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from influx_related import get_influxdb_client, get_influx_data, write_influx_data + + +router = APIRouter() + +@router.get( + path="/get_influx/", + name='get data from influxdb, last one hour', + #response_model=InfluxDBClientAsync, + responses={ + status.HTTP_401_UNAUTHORIZED: {}, + } +) +async def get_influx(client: InfluxDBClientAsync = Depends(get_influxdb_client)): + return {'results': await get_influx_data(client)} + +@router.post( + path="/write_influx/", + name='write data to influxdb', + #response_model=InfluxDBClientAsync, + responses={ + status.HTTP_401_UNAUTHORIZED: {}, + } +) +async def write_influx(temp: float, client: InfluxDBClientAsync = Depends(get_influxdb_client)): + await write_influx_data(client, temp) + return {"status": "OK"} + diff --git a/config.py b/config.py new file mode 100644 index 0000000..650abed --- /dev/null +++ b/config.py @@ -0,0 +1,6 @@ + + +INFLUX_URL='http://localhost:8086' +INFLUX_BUCKET='test_bucket' +INFLUX_TOKEN='Os1nJwYP1TyPRF0iQhaft677Qh87uPPMoROFtmlh_8uXbc5q1xQDguCjtWZFj6nN69KJylXSupBVt6M5CQZueA==' +INFLUX_ORG='my_org' diff --git a/influx_related.py b/influx_related.py new file mode 100644 index 0000000..abd3489 --- /dev/null +++ b/influx_related.py @@ -0,0 +1,34 @@ +from fastapi import FastAPI +from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from influxdb_client import Point +from config import * + +influxdb_client: InfluxDBClientAsync + +def init_influx(app: FastAPI): + global influxdb_client + influxdb_client = InfluxDBClientAsync( + url=INFLUX_URL, + token=INFLUX_TOKEN, + org=INFLUX_ORG, + ) + return influxdb_client + +async def get_influxdb_client() -> InfluxDBClientAsync: + global influxdb_client + return influxdb_client + + +async def get_influx_data(client: InfluxDBClientAsync) -> list[dict]: + query_api = client.query_api() + query = f'from(bucket: "{INFLUX_BUCKET}") |> range(start: -1h)' + tables = await query_api.query(query=query, org=INFLUX_ORG) + records = [ + {"time": record.get_time(), "value": record.get_value(), "field": record.get_field()} + for table in tables for record in table.records + ] + return records + +async def write_influx_data(client: InfluxDBClientAsync, temp:float): + point = Point("measurement").field("temperature", temp) + return await client.write_api().write(bucket=INFLUX_BUCKET, record=point) \ No newline at end of file diff --git a/internal_loop.py b/internal_loop.py new file mode 100644 index 0000000..ef5351c --- /dev/null +++ b/internal_loop.py @@ -0,0 +1,19 @@ +import asyncio + +from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from config import INFLUX_BUCKET, INFLUX_ORG +from influx_related import get_influx_data + + +async def internal_loop(influxdb_client: InfluxDBClientAsync): + i = 0 + while True: + print("internal_loop", i) + i += 1 + try: + records = await get_influx_data(influxdb_client) + print("influx records:", len(records)) + [print(" -", i) for i in records] + except Exception as e: + print(e) + await asyncio.sleep(5) diff --git a/main.py b/main.py new file mode 100644 index 0000000..2524822 --- /dev/null +++ b/main.py @@ -0,0 +1,36 @@ +import asyncio +from contextlib import asynccontextmanager + +import uvicorn +from fastapi import FastAPI +from internal_loop import internal_loop + +from api import testAPI +from influx_related import init_influx + +@asynccontextmanager +async def startup(fast_api: FastAPI): + # Startup logic + print("startup") + influxdb_client = init_influx(fast_api) + asyncio.create_task(internal_loop(influxdb_client)) + yield + # Shutdown logic + + +app = FastAPI(lifespan=startup) + +app.include_router( + testAPI.router, + prefix='/api', + tags=['testApi'], +) + +# Press the green button in the gutter to run the script. +if __name__ == '__main__': + uvicorn.run( + "main:app", + host='0.0.0.0', + port=8082, + log_level='debug', + )