2

Though I do understand that

while(true){
}

generates an infinite loop, it is my understanding that

while(true){
blockingCall()
}

allows this loop to be executed x number of times (x could be between 0 and a number that reaches resource limits of given machine) due to the nature of blocking calls i.e. if there are 3 calls made to blockingCall() method and 3rd call never returned, that means the program should wait there. This is a theme of implementation, which is not working the way i expect it to work. I am implementing a program of client/server using Java Sockets. https://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html is a reference link to understand what my client is doing (it simply requests a connection to server running on a specific port and sends a msg. Server reverses that msg and sends back to client). I am trying to implement the server in a way such that there are limits to the number of connections this server allows. If number of clients requesting connection goes beyond this limit, then additional requests are queued up to a maximum limit. Once this max limit is surpassed, server simply writes a message to log stating "no more connections will be accepted". Below is my server program :

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.*;

public class MultithreadedServer {
    private static BlockingQueue<Socket> queuedSockets = new ArrayBlockingQueue<>(1);                  //max queued connections.
    private static Semaphore semaphoreForMaxConnectionsAllowed = new Semaphore(2);              //max active connections being served.

    private static void handleClientConnectionRequest(final Socket newSocketForClientConnection, final Semaphore maxConnectionSemaphore) {
        new Thread(new Runnable() {

            @Override
            public void run() {

                try (
                        BufferedReader socketReader = new BufferedReader(new InputStreamReader(newSocketForClientConnection.getInputStream()));
                        PrintWriter socketWriter = new PrintWriter(newSocketForClientConnection.getOutputStream(), true)
                ) {

                    maxConnectionSemaphore.acquire();

                    String serverMsg;
                    String clientMsg;

                    SocketAddress clientSocket = (InetSocketAddress) newSocketForClientConnection.getRemoteSocketAddress();

                    while ((clientMsg = socketReader.readLine()) != null) {
                        if (clientMsg.equalsIgnoreCase("quit")) {
                            maxConnectionSemaphore.release();
                            break;
                        }

                        System.out.println("client with socket " + clientSocket + " sent MSG : " + clientMsg);
                        serverMsg = reverseString(clientMsg);

                        socketWriter.println(serverMsg);
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Closing client upon client's request.");
                }
            }
        }).start();
    }

    private static String reverseString(String clientMsg) {
        synchronized (clientMsg) {
            StringBuffer stringBuffer = new StringBuffer();

            for (int i = clientMsg.length() - 1; i >= 0; i--) {
                stringBuffer.append(clientMsg.charAt(i));
            }

            return stringBuffer.toString();
        }
    }

    public static void main(String[] args) throws IOException {
        boolean shouldContinue = true;

        if (args.length != 1) {
            System.out.println("Incorrect number of arguments at command line");
            System.exit(1);
        }

        ServerSocket serverSocket = null;

        try {
            Integer portNumber = Integer.parseInt(args[0]);
            serverSocket = new ServerSocket(portNumber);
            int connectionNumber = 0;

            System.out.println("Server listening on port# : " + args[0]);

            //main thread...
            while (shouldContinue) {
                Socket newServerSocketForClientConnection = null;
                newServerSocketForClientConnection = queuedSockets.poll();

                if (newServerSocketForClientConnection == null) {
                    newServerSocketForClientConnection = serverSocket.accept();

                    connectionNumber++;
                    System.out.println("Created new socket upon client request. ConnectionCOunt = " + connectionNumber);

                    processConnection(newServerSocketForClientConnection);
                } else {
                    //i.e. queue has a socket request pending.
                    System.out.println("Picking queued socket..");
                    processConnection(newServerSocketForClientConnection);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
        }
    }

    private static void processConnection(Socket newServerSocketForClientConnection) {

        if (semaphoreForMaxConnectionsAllowed.availablePermits() > 0) {
            handleClientConnectionRequest(newServerSocketForClientConnection, semaphoreForMaxConnectionsAllowed);
        } else {
            //System.out.println("Since exceeded max connection limit, adding in queue.");
            if (queuedSockets.offer(newServerSocketForClientConnection)) {
                System.out.println("connectionRequest queued because no more space on server. QueuedSocketList size : " + queuedSockets.size());
            }else{
                System.out.println("No space available for client connections. Can not be queued too.");
            }

        }

    }
}

Output observed through this server, when number of client requests goes beyond semaphore limit (for some reason, i must use Semaphore in my program and can't use ExecutorService with FixedThreadPool) :

enter image description here

My question is : It appears that queuedSockets.poll() doesn't seem to be removing element from blockingQueue. That is why i'm getting this psuedo-infinite loop. Any clue why this is happening? I checked the documentation of blockingQueue and the doc says poll() will "Retrieves and removes the head of this queue", but doesn't seem happening for above program.

1 Answer 1

1

Let's step through this loop:

//main thread...
        while (shouldContinue) {
            Socket newServerSocketForClientConnection = null;
            // poll for a pending connection in the queue
            newServerSocketForClientConnection = queuedSockets.poll();

            // if a pending connection exists, go to else...
            if (newServerSocketForClientConnection == null) {
                ...
            } else {
                // queue has a socket request pending, so we process the request...
                System.out.println("Picking queued socket..");
                processConnection(newServerSocketForClientConnection);
            }
        }

And then in processConnection():

    // if there are no permits available, go to else...
    if (semaphoreForMaxConnectionsAllowed.availablePermits() > 0) {
        handleClientConnectionRequest(newServerSocketForClientConnection, semaphoreForMaxConnectionsAllowed);
    } else {
        // BlockingQueue.offer() puts this connection immediately back into the queue,
        // then the method exits
        if (queuedSockets.offer(newServerSocketForClientConnection)) {
            System.out.println("connectionRequest queued because no more space on server. QueuedSocketList size : " + queuedSockets.size());
        }else{
            System.out.println("No space available for client connections. Can not be queued too.");
        }

    }

After that, on the next iteration of the loop:

//main thread...
        while (shouldContinue) {
            Socket newServerSocketForClientConnection = null;
            // poll immediately gets the same request that was 
            // removed in the previous iteration
            newServerSocketForClientConnection = queuedSockets.poll();

            // Once something is in the queue, this condition will
            // never be met, so no new incoming connections
            // can be accepted
            if (newServerSocketForClientConnection == null) {
                ...
            } else {
                // process the same request again, forever, or until
                // a connection is freed up. Meanwhile, all other
                // incoming requests are being ignored.
                System.out.println("Picking queued socket..");
                processConnection(newServerSocketForClientConnection);
            }
        }

So it's not that the request is never being removed from the queue, it just get immediately put back afterwards due to being blocked by the Semaphore.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your suggestion. That was the issue. Though i was seeing lot of msgs logged on server side (almost to infinity), as soon as i close any existing client program, semaphore.release() made available the monitor and thus picked the next socket from the queue. Thanks for your prompt and descriptive answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.