64 lines
1.6 KiB
Python
64 lines
1.6 KiB
Python
import os
|
|
import asyncio
|
|
|
|
import nats
|
|
from nats.errors import TimeoutError as NATSTimeoutError
|
|
|
|
NATS_SERVERS = os.getenv('NATS_URL', 'nats://localhost:4222').split(',')
|
|
NATS_TOKEN = os.getenv('NATS_TOKEN', '')
|
|
|
|
async def nats_subscribe(nc, subject):
|
|
sub = await nc.subscribe(subject)
|
|
while True:
|
|
try:
|
|
msg = await sub.next_msg(timeout=0.1)
|
|
print(f"Received a message on '{msg.subject}': {msg.data}")
|
|
except NATSTimeoutError:
|
|
continue
|
|
except KeyboardInterrupt:
|
|
await sub.unsubscribe()
|
|
return True
|
|
|
|
async def nats_publish(nc, subject, data):
|
|
count = 0
|
|
while True:
|
|
try:
|
|
print(f"Publishing a message on '{subject}': {data}_{count}")
|
|
await nc.publish(subject, (f"{data}_{count}").encode())
|
|
count += 1
|
|
await asyncio.sleep(5)
|
|
except KeyboardInterrupt:
|
|
return True
|
|
|
|
async def main():
|
|
print(f'Startting ...')
|
|
print(f'NATS_SERVER: {NATS_SERVERS}')
|
|
print(f'NATS_TOKEN: {NATS_TOKEN}')
|
|
|
|
|
|
nc = await nats.connect(servers=NATS_SERVERS,
|
|
token=NATS_TOKEN)
|
|
|
|
asyncio.create_task(nats_subscribe(nc, "greet.*"))
|
|
asyncio.create_task(nats_publish(nc, "greet.bob", "hello_bob"))
|
|
await asyncio.sleep(0.5)
|
|
asyncio.create_task(nats_publish(nc, "greet.alice", "hello_alice"))
|
|
|
|
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
await nc.drain()
|
|
|
|
|
|
|
|
|
|
# Press the green button in the gutter to run the script.
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
|