my use-case (an home task to practice abstraction, exception handling, unitests) is regarding a message-queue with small-fixed-size client requests from stdin of such:
- [ENQ] data (insert json data to queue)
- [DEQ] (get latest in queue)
- [STATUS] (status of queue)
- [STOP] (stop server)
- [EXIT]
Without any scalability demands, except that there could be multi-clients and command might be extended in future.
I know there could be multiple approaches to handle multi-client-server model - multi-threaded or select one and that there are existent frameworks (Twisted, gevant). I have chosen to implement a simple socket-per-command approach with BLOCKING threads. one main server thread will serve each coming socket with a timeout. every-time a new request arrives - new socket_connection is open - server reads whole data, sends a response and socket is closed. Then server moves to the next socket_connection, eventually holding ONE socket-a-time in a while() loop.
Pros for the approach are:
- Intention to practice on abstraction, exception handling, tests and not on 'the right' way to work with sockets.
- Easy to code and handle command-per-connection and verify buffer is fully 'read' by server.
Cons:
- Sending huge JSON data on ENQ might block server main thread for long period of time.
- Not easy to extend / scale.
- Overhead of open-close connection socket.
- NO Concurrency.
My questions are:
- Are best practices/design patterns used for this use-case? in terms of time of coding-time, complexity, performance, etc.
- If so - how is the flow?
- Are all cases taken care?
- Exception handling?
- Is this approach better than select? server now holds 2x sockets (master & client) and only takes care of one socket, client- creates and terminates it socket per each stdin-request. rather than Select() which holds multiple sockets per client and DOES NOT TERMINATE socket for the whole std-in connection.
Server-side: accept a socket connection and send response:
def serve_forever(self):
# init socket
self.socket = self.init_socket(self.connection_details)
while not self._stop:
# connect new client:
conn, addr = self.socket.accept()
conn.settimeout(Utils.TIMEOUT)
self.logger.info("New client {} has arrived".format(conn.getpeername()))
response = {}
try:
# read from socket:
data = self.read(conn)
self.logger.info("Read successfully data from client")
if not data:
break # client disconnected while writing to socket
# write response to socket:
response = self.handle_request(Utils.json_decode(data))
self.write(conn, Utils.json_encode(response))
self.logger.info("Sent successfully {}".format(response))
except (socket.timeout, SocketReadError, SocketWriteError) as e:
self._handle_socket_err(response, e)
finally:
# close connection:
self._close_connection(conn)
self.logger.info("Closed client connection successfully")
self.stop_server()
Client-side: create a socket-per-command(request):
def run(self):
while True:
# parse std_input
in_read = input("Enter wanted command\n").split(" ", 1)
if not Utils.validate_input(in_read):
print("Wrong stdin command entered")
continue
# init new socket connection:
self.connect()
try:
# send request to server:
request = self.parse_command(in_read)
self.write(self.socket, Utils.json_encode(request))
self.logger.info("Client {} sent successfully command: {}".format(self._get_name(), request))
# read response from server:
response = Utils.json_decode(self.read(self.socket))
self.logger.info("Client {} received successfully response {}".format(self._get_name(), response))
self.close_connection()
except (socket.timeout, SocketReadError, SocketWriteError) as e:
self.logger.error("Exception occurred".format(e))
else:
if request.get("type") == "EXIT" or request.get("type") == "STOP":
self.logger.info("Closing server and existing client stdin")
exit()
And simple handle_request (command) on server side:
def handle_request(self, request: dict) -> dict:
self.logger.debug("Handling request {}".format(request))
command_type, payload, status_code = request.get("type"), request.get("payload"), StatusCode.OK
if command_type == "ENQ":
status_code, payload = self._handle_enq(payload)
elif command_type == "DEQ":
status_code, payload = self._handle_deq(payload)
elif command_type == "DEBUG":
status_code, payload = self._handle_debug(payload)
elif command_type == "STAT":
status_code, payload = self._handle_stat(payload)
elif command_type == "STOP" or command_type == "EXIT":
status_code, payload = self._handle_stop(payload)
self.logger.debug("Finished handling request")
response = self._create_response(status_code, command_type, payload)
return response