Files
FastAPI_Kafka/consumer.py
2025-04-15 12:09:17 +02:00

43 lines
1.1 KiB
Python

import asyncio
import json
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
KAFKA_BOOTSTRAP = "localhost:19092"
REQUEST_TOPIC = "request-topic"
RESPONSE_TOPIC = "response-topic"
async def main():
consumer = AIOKafkaConsumer(
REQUEST_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP,
group_id="worker-group",
auto_offset_reset="earliest"
)
await consumer.start()
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
await producer.start()
try:
async for msg in consumer:
data = json.loads(msg.value.decode())
print(f"Received request: {data}")
payload = data["payload"]
correlation_id = data["correlation_id"]
# Simulate processing
await asyncio.sleep(1)
response_data = {
"correlation_id": correlation_id,
"response": f"Processed: {payload}"
}
await producer.send_and_wait(RESPONSE_TOPIC, json.dumps(response_data).encode())
finally:
await consumer.stop()
await producer.stop()
if __name__ == "__main__":
asyncio.run(main())