Files
nats-python/main_test.py
T
2026-06-01 09:46:32 +02:00

64 lines
1.6 KiB
Python

import os
import asyncio
import nats
from nats.errors import TimeoutError as NATSTimeoutError
NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',')
NATS_TOKEN = os.getenv('NATS_TOKEN', '')
async def nats_subscribe(nc, subject):
sub = await nc.subscribe(subject)
while True:
try:
msg = await sub.next_msg(timeout=0.1)
print(f"Received a message on '{msg.subject}': {msg.data}")
except NATSTimeoutError:
continue
except KeyboardInterrupt:
await sub.unsubscribe()
return True
async def nats_publish(nc, subject, data):
count = 0
while True:
try:
print(f"Publishing a message on '{subject}': {data}_{count}")
await nc.publish(subject, (f"{data}_{count}").encode())
count += 1
await asyncio.sleep(5)
except KeyboardInterrupt:
return True
async def main():
print(f'Startting ...')
print(f'NATS_SERVER: {NATS_SERVERS}')
print(f'NATS_TOKEN: {NATS_TOKEN}')
nc = await nats.connect(servers=NATS_SERVERS,
token=NATS_TOKEN)
asyncio.create_task(nats_subscribe(nc, "greet.*"))
asyncio.create_task(nats_publish(nc, "greet.bob", "hello_bob"))
await asyncio.sleep(0.5)
asyncio.create_task(nats_publish(nc, "greet.alice", "hello_alice"))
while True:
await asyncio.sleep(1)
await nc.drain()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
# See PyCharm help at https://www.jetbrains.com/help/pycharm/