2

I am doing a client-server project for my college project, we have to allocate the login to the client.

Client system will request its status for every 2 seconds(to check whether the client is locked or unlocked). and server will accept the client request and reply the client status to the system.

But the problem is server thread is not responding to the client request.

CLIENT THREAD:

def checkPort():
  while True:
    try:
        s = socket.socket()     
        s.connect((host, port))     
        s.send('pc1')                 # send PC name to the server
        status = s.recv(1024)         # receive the status from the server

        if status == "unlock":
            disableIntrrupts()        # enable all the functions of system
        else:
            enableInterrupts()        # enable all the functions of system

        time.sleep(5)                
        s.close()

    except Exception:
        pass

SERVER THREAD:

def check_port():
    while True:
      try:
        print "hello loop is repeating"
        conn, addr = s.accept()
        data = conn.recv(1024)

        if exit_on_click == 1:
            break
        if (any(sublist[0] == data for sublist in available_sys)):
            print "locked"
            conn.send("lock")
        elif (any(sublist[0] == data for sublist in occupied_sys)):
            conn.send("unlock")
            print "unlocked"
        else:
            print "added to gui for first time"
            available_sys.append([data,addr[0],nameText,usnText,branchText])
            availSysList.insert('end',data)
      except Exception:
            pass

But my problem is server thread is not executing more than 2 time, So its unable to accept client request more than one time. can't we handle multiple client sockets using single server socket? How to handle multiple client request from server ?

Thanks for any help !!

5 Answers 5

6

Its because your server, will block waiting for a new connection on this line

conn, addr = s.accept()

This is because calls like .accept and .read are blocking calls that hold the process

You need to consider an alternative design, where in you either.

  • Have one process per connection (this idea is stupid)
  • One thread per connection (this idea is less stupid than the first but still mostly foolish)
  • Have a non blocking design that allows multiple clients and read/write without blocking execution.

To achieve the first, look at multiprocessing, the second is threading the third is slightly more complicated to get your head around but will yield the best results, the go to library for event driven code in Python is twisted but there are others like

And so so many more that I haven't listed here.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the reply, I think its not feasible to handle each connection with different thread. but what can i modify my above code to meet the requirement(bcz i've already implemented threading)?
My answer already contains suggestions, look at twisted or gevent or another async framework
how to handle thing with multi processing?
multiprocessing modukle
2

here's an full example of implementing a threaded server. it's fully functional and comes with the benefit of using SSL as well. further, i use threaded event objects to signal another class object after storing my received data in a database.

please note, _sni and _cams_db are additional modules purely of my own. if you want to see the _sni module (provides SNI support for pyOpenSSL), let me know.

what follows this, is a snippet from camsbot.py, there's a whole lot more that far exceeds the scope of this question. what i've built is a centralized message relay system. it listens to tcp/2345 and accepts SSL connections. each connection passes messages into the system. short lived connections will connect, pass message, and disconnect. long lived connections will pass numerous messages after connecting. messages are stored in a database and a threading.Event() object (attached to the DB class) is set to tell the bot to poll the database for new messages and relay them.

the below example shows

  • how to set up a threaded tcp server
  • how to pass information from the listener to the accept handler such as config data and etc

in addition, this example also shows

  • how to employ an SSL socket
  • how to do some basic certificate validations
  • how to cleanly wrap and unwrap SSL from a tcp socket
  • how to use poll() on the socket instead of select()

db.pending is a threading.Event() object in _cams_db.py

in the main process we start another thread that waits on the pending object with db.pending.wait(). this makes that thread wait until another thread does db.pending.set(). once it is set, our waiting thread immediately wakes up and continues to work. when our waiting thread is done, it calls db.pending.clear() and goes back to the beginning of the loop and starts waiting again with db.pending.wait()

while True:
    db.pending.wait()
    # after waking up, do code. for example, we wait for incoming messages to
    # be stored in the database. the threaded server will call db.pending.set()
    # which will wake us up. we'll poll the DB for new messages, relay them, clear
    # our event flag and go back to waiting.
    # ...
    db.pending.clear()

snippet from camsbot.py:

import sys, os, sys, time, datetime, threading, select, logging, logging.handlers
import configparser, traceback, re, socket, hashlib
# local .py
sys.path.append('/var/vse/python')
import _util, _webby, _sni, _cams_db, _cams_threaded_server, _cams_bot
# ...

def start_courier(config):
    # default values
    host = '::'
    port = 2345

    configp = config['configp']

    host = configp.get('main', 'relay msp hostport')

    # require ipv6 addresses be specified in [xx:xx:xx] notation, therefore
    # it is safe to look for :nnnn at the end
    if ':' in host and not host.endswith(']'):
        port = host.split(':')[-1]
        try:
            port = int(port, 10)
        except:
            port = 2345
        host = host.split(':')[:-1][0]

    server = _cams_threaded_server.ThreadedTCPServer((host, port), _cams_threaded_server.ThreadedTCPRequestHandler, config)
    t = threading.Thread(target=server.serve_forever, name='courier')
    t.start()

_cams_threaded_server.py:

import socket, socketserver, select, datetime, time, threading
import sys, struct

from OpenSSL.SSL import SSLv23_METHOD, SSLv3_METHOD, TLSv1_METHOD, OP_NO_SSLv2
from OpenSSL.SSL import VERIFY_NONE, VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, Context, Connection
from OpenSSL.SSL import FILETYPE_PEM
from OpenSSL.SSL import WantWriteError, WantReadError, WantX509LookupError, ZeroReturnError, SysCallError
from OpenSSL.crypto import load_certificate
from OpenSSL import SSL

# see note at beginning of answer
import _sni, _cams_db

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    def __init__(self, server_address, HandlerClass, config):
        socketserver.BaseServer.__init__(self, server_address, HandlerClass)
        self.address_family  = socket.AF_INET6
        self.connected       = []
        self.logger          = config['logger']
        self.config          = config

        self.socket = socket.socket(self.address_family, self.socket_type)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        sc = Context(TLSv1_METHOD)
        sc.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT, _sni.verify_cb)
        sc.set_tlsext_servername_callback(_sni.pick_certificate)
        self.sc = sc

        self.server_bind()
        self.server_activate()

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        config         = self.server.config
        logger         = self.server.logger
        connected      = self.server.connected
        sc             = self.server.sc

        try:
           self.peer_hostname = socket.gethostbyaddr(socket.gethostbyname(self.request.getpeername()[0]))[0]
        except:
           self.peer_hostname = '!'+self.request.getpeername()[0]

        logger.info('peer: {}'.format(self.peer_hostname))

        ssl_s = Connection(sc, self.request)
        ssl_s.set_accept_state()

        try:
            ssl_s.do_handshake()
        except:
            t,v,tb = sys.exc_info()
            logger.warn('handshake failed {}'.format(v))

        ssl_s.setblocking(True)
        self.ssl_s = ssl_s

        try:
            peercert = ssl_s.get_peer_certificate()
        except:
            peercert = False
            t,v,tb = sys.exc_info()
            logger.warn('SSL get peer cert failed: {}'.format(v))                   

        if not peercert:
            logger.warn('No peer certificate')

        else:
            acl = config['configp']['main'].get('client cn acl', '').split(' ')
            cert_subject = peercert.get_subject().CN
            logger.info('Looking for {} in acl: {}'.format(cert_subject,acl))

            if cert_subject in acl:
                logger.info('{} is permitted'.format(cert_subject))
            else:
                logger.warn('''client CN not approved''')

        # it's ok to block here, every socket has its own thread
        ssl_s.setblocking(True)

        self.db  = config['db']
        msgcount = 0

        p = select.poll()
        # don't want writable, just readable

        p.register(self.request, select.POLLIN|select.POLLPRI|select.POLLERR|select.POLLHUP|select.POLLNVAL)
        peername = ssl_s.getpeername()

        x = peername[0]
        if x.startswith('::ffff:'):
            x = x[7:]
        peer_ip = x

        try:
            host = socket.gethostbyaddr(x)[0]
        except:
            host = peer_ip

        logger.info('{}/{}:{} connected'.format(host, peer_ip, peername[1]))
        connected.append( [host, peername[1]] )

        if peercert:
            threading.current_thread().setName('{}/port={}/CN={}'.format(host, peername[1], peercert.get_subject().CN))
        else:
            threading.current_thread().setName('{}/port={}'.format(host, peername[1]))

        sockclosed = False
        while not sockclosed:
            keepreading = True

            #logger.debug('starting 30 second timeout for poll')
            pe = p.poll(30.0)
            if not pe:
                # empty list means poll timeout
                # for SSL sockets it means WTF. we get an EAGAIN like return even if the socket is blocking
                continue

            logger.debug('poll indicates: {}'.format(pe))

            #define SSL_NOTHING     1
            #define SSL_WRITING     2
            #define SSL_READING     3
            #define SSL_X509_LOOKUP 4

            while keepreading and not sockclosed:
                data,sockclosed,keepreading = self._read_ssl_data(2, head=True)
                if sockclosed or not keepreading:
                    time.sleep(5)
                    continue

                plen = struct.unpack('H', data)[0]
                data,sockclosed,keepreading = self._read_ssl_data(plen)

                if sockclosed or not keepreading:
                    time.sleep(5)
                    continue

                # send thank you, ignore any errors since we appear to have gotten
                # the message
                try:
                    self.ssl_s.sendall(b'ty')
                except:
                    pass

                # extract the timestamp

                message_ts = data[0:8]
                msgtype    = chr(data[8])
                message    = data[9:].decode()

                message_ts = struct.unpack('d', message_ts)[0]
                message_ts = datetime.datetime.utcfromtimestamp(message_ts).replace(tzinfo=datetime.timezone.utc)

                self.db.enqueue(config['group'], peer_ip, msgtype, message, message_ts)
                self.db.pending.set()

        # we're recommended to use the return socket object for any future operations rather than the original
        try:
            s = ssl_s.unwrap()
            s.close()
        except:
            pass

        connected.remove( [host, peername[1]] )
        t_name = threading.current_thread().getName()
        logger.debug('disconnect: {}'.format(t_name))

    def _read_ssl_data(self, wantsize=16384, head=False):
        _w = ['WANT_NOTHING','WANT_READ','WANT_WRITE','WANT_X509_LOOKUP']

        logger = self.server.logger

        data        = b''
        sockclosed  = False
        keepreading = True

        while len(data) < wantsize and keepreading and not sockclosed:
            rlen = wantsize - len(data)
            try:
                w,wr = self.ssl_s.want(),self.ssl_s.want_read()

                #logger.debug('  want({}) want_read({})'.format(_w[w],wr))
                x = self.ssl_s.recv(rlen)
                #logger.debug('  recv(): {}'.format(x))

                if not ( x or len(x) ):
                    raise ZeroReturnError

                data += x
                if not (len(x) == len(data) == wantsize):
                    logger.info('  read={}, len(data)={}, plen={}'.format(len(x),len(data),wantsize))

            except WantReadError:
                # poll(), when ready, read more
                keepreading = False
                logger.info('  got WantReadError')
                continue

            except WantWriteError:
                # poll(), when ready, write more
                keepreading = False
                logger.info('  got WantWriteError')
                continue

            except ZeroReturnError:
                # socket got closed, a '0' bytes read also means the same thing
                keepreading = False
                sockclosed  = True
                logger.info('  ZRE, socket closed normally')
                continue

            except SysCallError:
                keepreading = False
                sockclosed  = True

                t,v,tb = sys.exc_info()

                if v.args[0] == -1: # normal EOF
                    logger.info('  EOF found, keepreading=False')

                else:
                    logger.info('{} terminated session abruptly while reading plen'.format(self.peer_hostname))
                    logger.info('t: {}'.format(t))
                    logger.info('v: {}'.format(v))

                continue

            except:
                t,v,tb = sys.exc_info()
                logger.warning('  fucked? {}'.format(v))
                raise

        if not head and not len(data) == wantsize:
            logger.warn('  short read {} of {}'.format(len(data), wantsize))

        return data,sockclosed,keepreading

4 Comments

is your current server implementation actually threaded? because you're not handling threaded access to data properly, you're losing track of which socket connection you're operating on. let's assume for simplicity that 10 connections have been made. look at your code, which of the 10 socket connections is your .recv() or .send() operating on? with concurrent access, you've no idea because there's no distinction. a thread does not run a separate copy of code, it runs the same code.
so if you have two nearly back to back connections at socket.accept(), they'll both arrive at socket.read() but you don't know which one arrives first, so the first read() swallows all the data and the second one may be sitting waiting for data that won't ever arrive because you've lost the distinction of which connection you have. your code is only aware of one but you're walking through your path twice.
But i have a double, in the end of the while loop, I am closing the socket. so that i can receive a new connection in the next iteration of the loop. But how the conflict got raised?
you aren't closing it on the server
1

let's start with a bare bones threaded tcp server.

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    def __init__(self, server_address, HandlerClass):
        socketserver.BaseServer.__init__(self, server_address, HandlerClass)
        self.address_family  = socket.AF_INET
        self.socket = socket.socket(self.address_family, self.socket_type)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_bind()
        self.server_activate()

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # self.request is your accepted socket, do all your .read() and .wirte() on it
        s = self.request

        request = s.read(1024)
        # decide locked or unlocked. this example arbitrarily writes back 'locked'
        s.write('locked')

        # we're done, close the socket and exit with a default return of None
        s.close()

ok, start your threaded server with this in your main() function:

server = threading.ThreadedTCPServer(('127.0.0.1', 1234), ThreadedTCPRequestHandler)
t = threading.Thread(target=server.serve_forever, name='optional_name')
t.start()

now you can let the threading module handle the semantics of concurrency and not worry about it.

1 Comment

thank you very much for the answer. but what are the additional packages need to be imported? I have imported socket module but i am still getting the error
0

You might want to take a look at 0MQ and concurrent.futures. 0MQ has a Tornado event loop in the library and it reduces the complexity of socket programming. concurrent.futures is a high level interface over threading or multiprocessing.

Comments

0

You can see different concurrent server approaches at

https://bitbucket.org/arco_group/upper/src

These will help you to choose the better way for you.

Cheers

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.