119 lines
3.1 KiB
Python
119 lines
3.1 KiB
Python
import asyncio
|
|
import uuid
|
|
import json
|
|
|
|
import uvicorn
|
|
from fastapi import FastAPI
|
|
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
|
|
from contextlib import asynccontextmanager
|
|
|
|
KAFKA_BOOTSTRAP = "localhost:19092"
|
|
REQUEST_TOPIC = "request-topic"
|
|
RESPONSE_TOPIC = "response-topic"
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
producer = None
|
|
consumer = None
|
|
ml_models = {}
|
|
pending_responses = {} # correlation_id -> asyncio.Future
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup logic
|
|
print("============== STARTING LIFESPAN ==============")
|
|
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP, loop=loop)
|
|
await producer.start()
|
|
|
|
consumer = AIOKafkaConsumer(
|
|
RESPONSE_TOPIC,
|
|
bootstrap_servers=KAFKA_BOOTSTRAP,
|
|
loop=loop,
|
|
group_id="fastapi-group",
|
|
auto_offset_reset="earliest"
|
|
)
|
|
ml_models['producer'] = producer
|
|
ml_models['consumer'] = consumer
|
|
await consumer.start()
|
|
|
|
app.state.producer = producer
|
|
app.state.consumer = consumer
|
|
|
|
async def consume_responses():
|
|
async for msg in consumer:
|
|
try:
|
|
data = json.loads(msg.value.decode())
|
|
correlation_id = data.get("correlation_id")
|
|
future = pending_responses.pop(correlation_id, None)
|
|
if future:
|
|
future.set_result(data.get("response"))
|
|
except Exception as e:
|
|
print("Error handling response:", e)
|
|
|
|
app.state.consumer_task = asyncio.create_task(consume_responses())
|
|
|
|
yield # ← here the app runs
|
|
|
|
# Shutdown logic
|
|
app.state.consumer_task.cancel()
|
|
await producer.stop()
|
|
await consumer.stop()
|
|
|
|
|
|
|
|
async def consume_responses():
|
|
|
|
async for msg in ml_models['consumer']:
|
|
try:
|
|
data = json.loads(msg.value.decode())
|
|
correlation_id = data.get("correlation_id")
|
|
future = pending_responses.pop(correlation_id, None)
|
|
if future:
|
|
future.set_result(data.get("response"))
|
|
except Exception as e:
|
|
print("Error handling response:", e)
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
|
|
@app.post("/process/")
|
|
async def process(data: dict):
|
|
correlation_id = str(uuid.uuid4())
|
|
print(ml_models)
|
|
future = loop.create_future()
|
|
pending_responses[correlation_id] = future
|
|
|
|
message = {
|
|
"correlation_id": correlation_id,
|
|
"payload": data
|
|
}
|
|
|
|
print(ml_models['producer'])
|
|
await ml_models['producer'].send_and_wait(REQUEST_TOPIC, json.dumps(message).encode())
|
|
|
|
try:
|
|
response = await asyncio.wait_for(future, timeout=10)
|
|
print("+++", response)
|
|
print("===", dir(response))
|
|
return {"response": response}
|
|
except asyncio.TimeoutError:
|
|
pending_responses.pop(correlation_id, None)
|
|
return {"error": "Timeout waiting for response"}
|
|
|
|
def main():
|
|
logging_config = uvicorn.config.LOGGING_CONFIG
|
|
uvicorn.run(
|
|
"main:app",
|
|
host='0.0.0.0',
|
|
port=8085,
|
|
reload=True,
|
|
# debug=True,
|
|
# loglevel='debug',
|
|
log_config=logging_config,
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|