Files
FastAPI_Kafka/main.py
2025-04-15 12:09:17 +02:00

119 lines
3.1 KiB
Python

import asyncio
import uuid
import json
import uvicorn
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from contextlib import asynccontextmanager
KAFKA_BOOTSTRAP = "localhost:19092"
REQUEST_TOPIC = "request-topic"
RESPONSE_TOPIC = "response-topic"
loop = asyncio.get_event_loop()
producer = None
consumer = None
ml_models = {}
pending_responses = {} # correlation_id -> asyncio.Future
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup logic
print("============== STARTING LIFESPAN ==============")
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP, loop=loop)
await producer.start()
consumer = AIOKafkaConsumer(
RESPONSE_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP,
loop=loop,
group_id="fastapi-group",
auto_offset_reset="earliest"
)
ml_models['producer'] = producer
ml_models['consumer'] = consumer
await consumer.start()
app.state.producer = producer
app.state.consumer = consumer
async def consume_responses():
async for msg in consumer:
try:
data = json.loads(msg.value.decode())
correlation_id = data.get("correlation_id")
future = pending_responses.pop(correlation_id, None)
if future:
future.set_result(data.get("response"))
except Exception as e:
print("Error handling response:", e)
app.state.consumer_task = asyncio.create_task(consume_responses())
yield # ← here the app runs
# Shutdown logic
app.state.consumer_task.cancel()
await producer.stop()
await consumer.stop()
async def consume_responses():
async for msg in ml_models['consumer']:
try:
data = json.loads(msg.value.decode())
correlation_id = data.get("correlation_id")
future = pending_responses.pop(correlation_id, None)
if future:
future.set_result(data.get("response"))
except Exception as e:
print("Error handling response:", e)
app = FastAPI(lifespan=lifespan)
@app.post("/process/")
async def process(data: dict):
correlation_id = str(uuid.uuid4())
print(ml_models)
future = loop.create_future()
pending_responses[correlation_id] = future
message = {
"correlation_id": correlation_id,
"payload": data
}
print(ml_models['producer'])
await ml_models['producer'].send_and_wait(REQUEST_TOPIC, json.dumps(message).encode())
try:
response = await asyncio.wait_for(future, timeout=10)
print("+++", response)
print("===", dir(response))
return {"response": response}
except asyncio.TimeoutError:
pending_responses.pop(correlation_id, None)
return {"error": "Timeout waiting for response"}
def main():
logging_config = uvicorn.config.LOGGING_CONFIG
uvicorn.run(
"main:app",
host='0.0.0.0',
port=8085,
reload=True,
# debug=True,
# loglevel='debug',
log_config=logging_config,
)
if __name__ == '__main__':
main()