Files
nats-python/faststream_publisher.py
2026-06-01 09:46:32 +02:00

39 lines
1.0 KiB
Python

# faststream run faststream_app:app
import asyncio
import os
from faststream.specification.asyncapi.v3_0_0.schema import servers
from pydantic import BaseModel, Field
from faststream.nats import NatsBroker
from urllib3.util import url
NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',')
NATS_TOKEN = os.getenv('NATS_TOKEN', '')
broker = NatsBroker(servers=NATS_SERVERS, token=NATS_TOKEN)
class JobCreated(BaseModel):
job_id: str
name: str
subject: str = "jobs.created"
class JobResponse(BaseModel):
job_id: str = Field(..., description="Job identifier")
status: str = Field(..., description="Job status")
async def main():
await broker.connect()
await broker.publish(
JobCreated(job_id="123", name="build-image").model_dump(),
subject="jobs",
)
await broker.close()
asyncio.run(main())
# @broker.subscriber("jobs.create")
# async def create_job(msg: JobCreated) -> JobResponse:
# return JobResponse(job_id="123", status=f"created:{msg.name}")