From 824b97ce57639cb57ce49880806af903707f36d8 Mon Sep 17 00:00:00 2001 From: marys Date: Mon, 1 Jun 2026 12:31:47 +0200 Subject: [PATCH] add example of jetstream --- faststream_app_jetstream.py | 37 +++++++++++++++++++++++++++ faststream_publisher_jetstream.py | 42 +++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 faststream_app_jetstream.py create mode 100644 faststream_publisher_jetstream.py diff --git a/faststream_app_jetstream.py b/faststream_app_jetstream.py new file mode 100644 index 0000000..5db5c1e --- /dev/null +++ b/faststream_app_jetstream.py @@ -0,0 +1,37 @@ +# faststream run faststream_app:app + +import asyncio +import os +import time +from cProfile import run + +from faststream import FastStream +from faststream.specification.asyncapi.v3_0_0.schema import servers +from pydantic import BaseModel, Field +from faststream.nats import NatsBroker, JStream +from urllib3.util import url + +NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',') +NATS_TOKEN = os.getenv('NATS_TOKEN', '') + +broker = NatsBroker(servers=NATS_SERVERS, token=NATS_TOKEN) +app = FastStream(broker) + +stream = JStream(name="orders", subjects=["orders.*"]) + +@broker.subscriber("orders.created", stream=stream, durable="orders-worker") +async def handle_order(message: dict): + print("received:", message) + return {"status": "ok"} + +@broker.publisher("orders.created", stream="orders") +async def publish_order(): + return {"order_id": 123, "item": "book"} + + +async def main(): + await app.run() + +if __name__ == "__main__": + asyncio.run(main()) + broker.stop() \ No newline at end of file diff --git a/faststream_publisher_jetstream.py b/faststream_publisher_jetstream.py new file mode 100644 index 0000000..57c962d --- /dev/null +++ b/faststream_publisher_jetstream.py @@ -0,0 +1,42 @@ +# faststream run faststream_app:app + +import asyncio +import os + +from faststream.specification.asyncapi.v3_0_0.schema import servers +from pydantic import BaseModel, Field +from faststream.nats import NatsBroker, NatsMessage +from urllib3.util import url + +NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',') +NATS_TOKEN = os.getenv('NATS_TOKEN', '') + +broker = NatsBroker(servers=NATS_SERVERS, token=NATS_TOKEN) + +class JobCreated(BaseModel): + job_id: str + name: str + subject: str = "jobs.created" + +class JobResponse(BaseModel): + job_id: str = Field(..., description="Job identifier") + status: str = Field(..., description="Job status") + +async def main(): + await broker.connect() + # await broker.publish( + # JobCreated(job_id="123", name="build-image").model_dump(), + # subject="orders.created", + # ) + + async with broker: + response: NatsMessage = await broker.request( + JobCreated(job_id="128", name="build-image").model_dump(), + subject="orders.created", + timeout=3, + ) + print(f"{await response.decode()=}") + await broker.stop() + +asyncio.run(main()) +