From 6cd092da37bd624213d8566568976692eb1ea1c8 Mon Sep 17 00:00:00 2001 From: Arthur Lu Date: Fri, 15 Oct 2021 06:12:50 +0000 Subject: [PATCH] solution using zmq PUB/SUB, working locally Former-commit-id: c0c46a1b9deb5e43d6f3714a40440092ceb06e36 --- src/cli/client.py | 24 ++++++++++-------------- src/cli/superscript.py | 18 +++++++++++------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/cli/client.py b/src/cli/client.py index e96620a..bc4a15b 100644 --- a/src/cli/client.py +++ b/src/cli/client.py @@ -1,19 +1,15 @@ -import socket +import signal +import zmq -client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # UDP -# Enable port reusage so we will be able to run multiple clients and servers on single (host, port). -# Do not use socket.SO_REUSEADDR except you using linux(kernel<3.9): goto https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ for more information. -# For linux hosts all sockets that want to share the same address and port combination must belong to processes that share the same effective user ID! -# So, on linux(kernel>=3.9) you have to run multiple servers and clients under one user to share the same (host, port). -# Thanks to @stevenreddie -client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) +signal.signal(signal.SIGINT, signal.SIG_DFL) -# Enable broadcasting mode -client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) +context = zmq.Context() + +socket = context.socket(zmq.SUB) +socket.connect('tcp://localhost:5678') +socket.setsockopt(zmq.SUBSCRIBE, b'status') -client.bind(("", 5678)) while True: - # Thanks @seym45 for a fix - data, addr = client.recvfrom(1024) - print("received message: %s"%data) \ No newline at end of file + message = socket.recv_multipart() + print(f'Received: {message}') \ No newline at end of file diff --git a/src/cli/superscript.py b/src/cli/superscript.py index 36432cb..371d18f 100644 --- a/src/cli/superscript.py +++ b/src/cli/superscript.py @@ -156,7 +156,6 @@ import math from multiprocessing import Pool, freeze_support import os import pymongo -import socket import sys import time import traceback @@ -314,6 +313,8 @@ def main(send, verbose = False, profile = False, debug = False): send(stdout, INF, "closed threads and database client") send(stdout, INF, "finished all tasks in " + str(time.time() - loop_start) + " seconds, looping") + raise Exception("boop") + if profile: return 0 # return instead of break to avoid sys.exit @@ -547,16 +548,19 @@ def start(pid_path, verbose = False, profile = False, debug = False): stderr = f ): - server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - server.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - server.settimeout(0.2) + import zmq + + context = zmq.Context() + socket = context.socket(zmq.PUB) + socket.bind("tcp://*:5678") + + socket.send(b'status') def send(target, level, message, code = 0): - server.sendto(bytes(message, 'utf-8'), ('', 5678)) + socket.send(bytes("status: " + message, 'utf-8')) exit_code = main(send) - server.close() + socket.close() f.close() sys.exit()