diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3228387 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +.venv diff --git a/README.md b/README.md index e69de29..4a2d772 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,4 @@ + + + +pip install -r requirements.txt \ No newline at end of file diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..354ba3e --- /dev/null +++ b/consumer.py @@ -0,0 +1,42 @@ +import asyncio +import json +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer + +KAFKA_BOOTSTRAP = "localhost:19092" +REQUEST_TOPIC = "request-topic" +RESPONSE_TOPIC = "response-topic" + +async def main(): + consumer = AIOKafkaConsumer( + REQUEST_TOPIC, + bootstrap_servers=KAFKA_BOOTSTRAP, + group_id="worker-group", + auto_offset_reset="earliest" + ) + await consumer.start() + + producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP) + await producer.start() + + try: + async for msg in consumer: + data = json.loads(msg.value.decode()) + print(f"Received request: {data}") + payload = data["payload"] + correlation_id = data["correlation_id"] + + # Simulate processing + await asyncio.sleep(1) + + response_data = { + "correlation_id": correlation_id, + "response": f"Processed: {payload}" + } + await producer.send_and_wait(RESPONSE_TOPIC, json.dumps(response_data).encode()) + finally: + await consumer.stop() + await producer.stop() + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/main.py b/main.py new file mode 100644 index 0000000..a964b7b --- /dev/null +++ b/main.py @@ -0,0 +1,118 @@ +import asyncio +import uuid +import json + +import uvicorn +from fastapi import FastAPI +from aiokafka import AIOKafkaProducer, AIOKafkaConsumer +from contextlib import asynccontextmanager + +KAFKA_BOOTSTRAP = "localhost:19092" +REQUEST_TOPIC = "request-topic" +RESPONSE_TOPIC = "response-topic" + + +loop = asyncio.get_event_loop() + +producer = None +consumer = None +ml_models = {} +pending_responses = {} # correlation_id -> asyncio.Future + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup logic + print("============== STARTING LIFESPAN ==============") + producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP, loop=loop) + await producer.start() + + consumer = AIOKafkaConsumer( + RESPONSE_TOPIC, + bootstrap_servers=KAFKA_BOOTSTRAP, + loop=loop, + group_id="fastapi-group", + auto_offset_reset="earliest" + ) + ml_models['producer'] = producer + ml_models['consumer'] = consumer + await consumer.start() + + app.state.producer = producer + app.state.consumer = consumer + + async def consume_responses(): + async for msg in consumer: + try: + data = json.loads(msg.value.decode()) + correlation_id = data.get("correlation_id") + future = pending_responses.pop(correlation_id, None) + if future: + future.set_result(data.get("response")) + except Exception as e: + print("Error handling response:", e) + + app.state.consumer_task = asyncio.create_task(consume_responses()) + + yield # ← here the app runs + + # Shutdown logic + app.state.consumer_task.cancel() + await producer.stop() + await consumer.stop() + + + +async def consume_responses(): + + async for msg in ml_models['consumer']: + try: + data = json.loads(msg.value.decode()) + correlation_id = data.get("correlation_id") + future = pending_responses.pop(correlation_id, None) + if future: + future.set_result(data.get("response")) + except Exception as e: + print("Error handling response:", e) +app = FastAPI(lifespan=lifespan) + + +@app.post("/process/") +async def process(data: dict): + correlation_id = str(uuid.uuid4()) + print(ml_models) + future = loop.create_future() + pending_responses[correlation_id] = future + + message = { + "correlation_id": correlation_id, + "payload": data + } + + print(ml_models['producer']) + await ml_models['producer'].send_and_wait(REQUEST_TOPIC, json.dumps(message).encode()) + + try: + response = await asyncio.wait_for(future, timeout=10) + print("+++", response) + print("===", dir(response)) + return {"response": response} + except asyncio.TimeoutError: + pending_responses.pop(correlation_id, None) + return {"error": "Timeout waiting for response"} + +def main(): + logging_config = uvicorn.config.LOGGING_CONFIG + uvicorn.run( + "main:app", + host='0.0.0.0', + port=8085, + reload=True, + # debug=True, + # loglevel='debug', + log_config=logging_config, + ) + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8f08249 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +uvicorn~=0.34.1 +fastapi~=0.115.12 +aiokafka~=0.12.0 \ No newline at end of file