Influx related functionality finished

This commit is contained in:
marys
2025-09-25 10:40:29 +02:00
commit 795a494950
6 changed files with 128 additions and 0 deletions

0
api/__init__.py Normal file
View File

33
api/testAPI.py Normal file
View File

@@ -0,0 +1,33 @@
from fastapi import (
APIRouter,
status,
Body, Depends)
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influx_related import get_influxdb_client, get_influx_data, write_influx_data
router = APIRouter()
@router.get(
path="/get_influx/",
name='get data from influxdb, last one hour',
#response_model=InfluxDBClientAsync,
responses={
status.HTTP_401_UNAUTHORIZED: {},
}
)
async def get_influx(client: InfluxDBClientAsync = Depends(get_influxdb_client)):
return {'results': await get_influx_data(client)}
@router.post(
path="/write_influx/",
name='write data to influxdb',
#response_model=InfluxDBClientAsync,
responses={
status.HTTP_401_UNAUTHORIZED: {},
}
)
async def write_influx(temp: float, client: InfluxDBClientAsync = Depends(get_influxdb_client)):
await write_influx_data(client, temp)
return {"status": "OK"}

6
config.py Normal file
View File

@@ -0,0 +1,6 @@
INFLUX_URL='http://localhost:8086'
INFLUX_BUCKET='test_bucket'
INFLUX_TOKEN='Os1nJwYP1TyPRF0iQhaft677Qh87uPPMoROFtmlh_8uXbc5q1xQDguCjtWZFj6nN69KJylXSupBVt6M5CQZueA=='
INFLUX_ORG='my_org'

34
influx_related.py Normal file
View File

@@ -0,0 +1,34 @@
from fastapi import FastAPI
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client import Point
from config import *
influxdb_client: InfluxDBClientAsync
def init_influx(app: FastAPI):
global influxdb_client
influxdb_client = InfluxDBClientAsync(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG,
)
return influxdb_client
async def get_influxdb_client() -> InfluxDBClientAsync:
global influxdb_client
return influxdb_client
async def get_influx_data(client: InfluxDBClientAsync) -> list[dict]:
query_api = client.query_api()
query = f'from(bucket: "{INFLUX_BUCKET}") |> range(start: -1h)'
tables = await query_api.query(query=query, org=INFLUX_ORG)
records = [
{"time": record.get_time(), "value": record.get_value(), "field": record.get_field()}
for table in tables for record in table.records
]
return records
async def write_influx_data(client: InfluxDBClientAsync, temp:float):
point = Point("measurement").field("temperature", temp)
return await client.write_api().write(bucket=INFLUX_BUCKET, record=point)

19
internal_loop.py Normal file
View File

@@ -0,0 +1,19 @@
import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from config import INFLUX_BUCKET, INFLUX_ORG
from influx_related import get_influx_data
async def internal_loop(influxdb_client: InfluxDBClientAsync):
i = 0
while True:
print("internal_loop", i)
i += 1
try:
records = await get_influx_data(influxdb_client)
print("influx records:", len(records))
[print(" -", i) for i in records]
except Exception as e:
print(e)
await asyncio.sleep(5)

36
main.py Normal file
View File

@@ -0,0 +1,36 @@
import asyncio
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from internal_loop import internal_loop
from api import testAPI
from influx_related import init_influx
@asynccontextmanager
async def startup(fast_api: FastAPI):
# Startup logic
print("startup")
influxdb_client = init_influx(fast_api)
asyncio.create_task(internal_loop(influxdb_client))
yield
# Shutdown logic
app = FastAPI(lifespan=startup)
app.include_router(
testAPI.router,
prefix='/api',
tags=['testApi'],
)
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
uvicorn.run(
"main:app",
host='0.0.0.0',
port=8082,
log_level='debug',
)