add example of jetstream

This commit is contained in:
marys
2026-06-01 12:31:47 +02:00
parent 5e673fc5e9
commit 824b97ce57
2 changed files with 79 additions and 0 deletions
+37
View File
@@ -0,0 +1,37 @@
# faststream run faststream_app:app
import asyncio
import os
import time
from cProfile import run
from faststream import FastStream
from faststream.specification.asyncapi.v3_0_0.schema import servers
from pydantic import BaseModel, Field
from faststream.nats import NatsBroker, JStream
from urllib3.util import url
NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',')
NATS_TOKEN = os.getenv('NATS_TOKEN', '')
broker = NatsBroker(servers=NATS_SERVERS, token=NATS_TOKEN)
app = FastStream(broker)
stream = JStream(name="orders", subjects=["orders.*"])
@broker.subscriber("orders.created", stream=stream, durable="orders-worker")
async def handle_order(message: dict):
print("received:", message)
return {"status": "ok"}
@broker.publisher("orders.created", stream="orders")
async def publish_order():
return {"order_id": 123, "item": "book"}
async def main():
await app.run()
if __name__ == "__main__":
asyncio.run(main())
broker.stop()
+42
View File
@@ -0,0 +1,42 @@
# faststream run faststream_app:app
import asyncio
import os
from faststream.specification.asyncapi.v3_0_0.schema import servers
from pydantic import BaseModel, Field
from faststream.nats import NatsBroker, NatsMessage
from urllib3.util import url
NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',')
NATS_TOKEN = os.getenv('NATS_TOKEN', '')
broker = NatsBroker(servers=NATS_SERVERS, token=NATS_TOKEN)
class JobCreated(BaseModel):
job_id: str
name: str
subject: str = "jobs.created"
class JobResponse(BaseModel):
job_id: str = Field(..., description="Job identifier")
status: str = Field(..., description="Job status")
async def main():
await broker.connect()
# await broker.publish(
# JobCreated(job_id="123", name="build-image").model_dump(),
# subject="orders.created",
# )
async with broker:
response: NatsMessage = await broker.request(
JobCreated(job_id="128", name="build-image").model_dump(),
subject="orders.created",
timeout=3,
)
print(f"{await response.decode()=}")
await broker.stop()
asyncio.run(main())