import asyncio import uuid import json import uvicorn from fastapi import FastAPI from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from contextlib import asynccontextmanager KAFKA_BOOTSTRAP = "localhost:19092" REQUEST_TOPIC = "request-topic" RESPONSE_TOPIC = "response-topic" loop = asyncio.get_event_loop() producer = None consumer = None ml_models = {} pending_responses = {} # correlation_id -> asyncio.Future @asynccontextmanager async def lifespan(app: FastAPI): # Startup logic print("============== STARTING LIFESPAN ==============") producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP, loop=loop) await producer.start() consumer = AIOKafkaConsumer( RESPONSE_TOPIC, bootstrap_servers=KAFKA_BOOTSTRAP, loop=loop, group_id="fastapi-group", auto_offset_reset="earliest" ) ml_models['producer'] = producer ml_models['consumer'] = consumer await consumer.start() app.state.producer = producer app.state.consumer = consumer async def consume_responses(): async for msg in consumer: try: data = json.loads(msg.value.decode()) correlation_id = data.get("correlation_id") future = pending_responses.pop(correlation_id, None) if future: future.set_result(data.get("response")) except Exception as e: print("Error handling response:", e) app.state.consumer_task = asyncio.create_task(consume_responses()) yield # ← here the app runs # Shutdown logic app.state.consumer_task.cancel() await producer.stop() await consumer.stop() async def consume_responses(): async for msg in ml_models['consumer']: try: data = json.loads(msg.value.decode()) correlation_id = data.get("correlation_id") future = pending_responses.pop(correlation_id, None) if future: future.set_result(data.get("response")) except Exception as e: print("Error handling response:", e) app = FastAPI(lifespan=lifespan) @app.post("/process/") async def process(data: dict): correlation_id = str(uuid.uuid4()) print(ml_models) future = loop.create_future() pending_responses[correlation_id] = future message = { "correlation_id": correlation_id, "payload": data } print(ml_models['producer']) await ml_models['producer'].send_and_wait(REQUEST_TOPIC, json.dumps(message).encode()) try: response = await asyncio.wait_for(future, timeout=10) print("+++", response) print("===", dir(response)) return {"response": response} except asyncio.TimeoutError: pending_responses.pop(correlation_id, None) return {"error": "Timeout waiting for response"} def main(): logging_config = uvicorn.config.LOGGING_CONFIG uvicorn.run( "main:app", host='0.0.0.0', port=8085, reload=True, # debug=True, # loglevel='debug', log_config=logging_config, ) if __name__ == '__main__': main()