3
\$\begingroup\$

my use-case (an home task to practice abstraction, exception handling, unitests) is regarding a message-queue with small-fixed-size client requests from stdin of such:

  1. [ENQ] data (insert json data to queue)
  2. [DEQ] (get latest in queue)
  3. [STATUS] (status of queue)
  4. [STOP] (stop server)
  5. [EXIT]

Without any scalability demands, except that there could be multi-clients and command might be extended in future.

I know there could be multiple approaches to handle multi-client-server model - multi-threaded or select one and that there are existent frameworks (Twisted, gevant). I have chosen to implement a simple socket-per-command approach with BLOCKING threads. one main server thread will serve each coming socket with a timeout. every-time a new request arrives - new socket_connection is open - server reads whole data, sends a response and socket is closed. Then server moves to the next socket_connection, eventually holding ONE socket-a-time in a while() loop.

Pros for the approach are:

  1. Intention to practice on abstraction, exception handling, tests and not on 'the right' way to work with sockets.
  2. Easy to code and handle command-per-connection and verify buffer is fully 'read' by server.

Cons:

  1. Sending huge JSON data on ENQ might block server main thread for long period of time.
  2. Not easy to extend / scale.
  3. Overhead of open-close connection socket.
  4. NO Concurrency.

My questions are:

  1. Are best practices/design patterns used for this use-case? in terms of time of coding-time, complexity, performance, etc.
  2. If so - how is the flow?
  3. Are all cases taken care?
  4. Exception handling?
  5. Is this approach better than select? server now holds 2x sockets (master & client) and only takes care of one socket, client- creates and terminates it socket per each stdin-request. rather than Select() which holds multiple sockets per client and DOES NOT TERMINATE socket for the whole std-in connection.

Server-side: accept a socket connection and send response:

def serve_forever(self):
    # init socket
    self.socket = self.init_socket(self.connection_details)
    while not self._stop:
        # connect new client:
        conn, addr = self.socket.accept()
        conn.settimeout(Utils.TIMEOUT)
        self.logger.info("New client {} has arrived".format(conn.getpeername()))
        response = {}
        try:
            # read from socket:
            data = self.read(conn)
            self.logger.info("Read successfully data from client")
            if not data:
                break  # client disconnected while writing to socket
            # write response to socket:
            response = self.handle_request(Utils.json_decode(data))
            self.write(conn, Utils.json_encode(response))
            self.logger.info("Sent successfully {}".format(response))
        except (socket.timeout, SocketReadError, SocketWriteError) as e:
            self._handle_socket_err(response, e)
        finally:
            # close connection:
            self._close_connection(conn)
            self.logger.info("Closed client connection successfully")
    self.stop_server()

Client-side: create a socket-per-command(request):

def run(self):
    while True:
        # parse std_input
        in_read = input("Enter wanted command\n").split(" ", 1)
        if not Utils.validate_input(in_read):
            print("Wrong stdin command entered")
            continue
        # init new socket connection:
        self.connect()

        try:
            # send request to server:
            request = self.parse_command(in_read)
            self.write(self.socket, Utils.json_encode(request))
            self.logger.info("Client {} sent successfully command: {}".format(self._get_name(), request))

            # read response from server:
            response = Utils.json_decode(self.read(self.socket))
            self.logger.info("Client {} received successfully response {}".format(self._get_name(), response))
            self.close_connection()
        except (socket.timeout, SocketReadError, SocketWriteError) as e:
            self.logger.error("Exception occurred".format(e))
        else:
            if request.get("type") == "EXIT" or request.get("type") == "STOP":
                self.logger.info("Closing server and existing client stdin")
                exit()

And simple handle_request (command) on server side:

def handle_request(self, request: dict) -> dict:
    self.logger.debug("Handling request {}".format(request))

    command_type, payload, status_code = request.get("type"), request.get("payload"), StatusCode.OK
    if command_type == "ENQ":
        status_code, payload = self._handle_enq(payload)
    elif command_type == "DEQ":
        status_code, payload = self._handle_deq(payload)
    elif command_type == "DEBUG":
        status_code, payload = self._handle_debug(payload)
    elif command_type == "STAT":
        status_code, payload = self._handle_stat(payload)
    elif command_type == "STOP" or command_type == "EXIT":
        status_code, payload = self._handle_stop(payload)

    self.logger.debug("Finished handling request")
    response = self._create_response(status_code, command_type, payload)
    return response
\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.