41 lines
1.2 KiB
Python
41 lines
1.2 KiB
Python
# python fastsream_publisher.py
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
from pydantic import BaseModel, Field
|
|
from faststream import FastStream
|
|
from faststream.nats import NatsBroker
|
|
|
|
SUBJECT="jobs"
|
|
|
|
NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',')
|
|
NATS_TOKEN = os.getenv('NATS_TOKEN', '')
|
|
|
|
broker = NatsBroker(servers=NATS_SERVERS, token=NATS_TOKEN)
|
|
app = FastStream(broker)
|
|
|
|
class JobCreated(BaseModel):
|
|
job_id: str = Field(..., description="Job identifier")
|
|
name: str = Field(..., description="Job name")
|
|
|
|
|
|
class JobResponse(BaseModel):
|
|
job_id: str = Field(..., description="Job identifier")
|
|
status: str = Field(..., description="Job status")
|
|
|
|
@broker.subscriber(f"{SUBJECT}")
|
|
async def handle_job(event: JobCreated):
|
|
print(f"{SUBJECT} received job_id={event.job_id} name={event.name}")
|
|
print(f"{event.__dict__=}")
|
|
response_msg = JobCreated(job_id=event.job_id, name="created1").model_dump()
|
|
print(f"{response_msg=}")
|
|
print("===", await broker.publish(response_msg, subject=f"{SUBJECT}.response"))
|
|
return f"Hello {event.name}"
|
|
|
|
async def main():
|
|
await app.run()
|
|
|
|
if __name__ == "__main__":
|
|
#SUBJECT=sys.argv[1]
|
|
asyncio.run(main()) |