solution using zmq PUB/SUB,

working locally


Former-commit-id: c0c46a1b9d
This commit is contained in:
Arthur Lu 2021-10-15 06:12:50 +00:00
parent 8540642bef
commit 6cd092da37
2 changed files with 21 additions and 21 deletions

View File

@ -1,19 +1,15 @@
import socket
import signal
import zmq
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # UDP
# Enable port reusage so we will be able to run multiple clients and servers on single (host, port).
# Do not use socket.SO_REUSEADDR except you using linux(kernel<3.9): goto https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ for more information.
# For linux hosts all sockets that want to share the same address and port combination must belong to processes that share the same effective user ID!
# So, on linux(kernel>=3.9) you have to run multiple servers and clients under one user to share the same (host, port).
# Thanks to @stevenreddie
client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Enable broadcasting mode
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5678')
socket.setsockopt(zmq.SUBSCRIBE, b'status')
client.bind(("", 5678))
while True:
# Thanks @seym45 for a fix
data, addr = client.recvfrom(1024)
print("received message: %s"%data)
message = socket.recv_multipart()
print(f'Received: {message}')

View File

@ -156,7 +156,6 @@ import math
from multiprocessing import Pool, freeze_support
import os
import pymongo
import socket
import sys
import time
import traceback
@ -314,6 +313,8 @@ def main(send, verbose = False, profile = False, debug = False):
send(stdout, INF, "closed threads and database client")
send(stdout, INF, "finished all tasks in " + str(time.time() - loop_start) + " seconds, looping")
raise Exception("boop")
if profile:
return 0 # return instead of break to avoid sys.exit
@ -547,16 +548,19 @@ def start(pid_path, verbose = False, profile = False, debug = False):
stderr = f
):
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server.settimeout(0.2)
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5678")
socket.send(b'status')
def send(target, level, message, code = 0):
server.sendto(bytes(message, 'utf-8'), ('<broadcast>', 5678))
socket.send(bytes("status: " + message, 'utf-8'))
exit_code = main(send)
server.close()
socket.close()
f.close()
sys.exit()