1

I opened recently one of my asynchronous socket snippets, and I was amazed by the buggy behavior of the following client code, it is a basic PoC client using AsynchronousSocketChannel.

This snippet ideally, it should never reach the "I am freaking out", but it does. Basically the problem is I use a ByteBuffer which at the end of the loop I set its position to 0, at the beginning of the loop I expect it to be 0, but SOMETIMES it is not.

The video showing of the bug is:
https://www.youtube.com/watch?v=bV08SjYutRw&feature=youtu.be
I can solve the problem calling .clear() just after .nextLine() but, I feel curious, what is going on in this innocent snippet?

package com.melardev.sockets.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;

public class AsyncTcpClientCallbacks {

    // Horrible demo, I would never write asynchronous sockets this way, I would use attachments
    // which allows our code to be cleaner and isolate everything into their own classes
    // This is only here to show you how you could do it without attachments, but you have
    // to expose the socketChannel so it can be accessible from everywhere, my recommendation is not to bother
    // learning, this, go to the demo where I use attachments, it is a lot more readable


    static CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer bytesReaded, ByteBuffer buffer) {
            buffer.flip();
            byte[] receivedBytes = new byte[buffer.limit()];
            // Get into receivedBytes
            buffer.get(receivedBytes);
            String message = new String(receivedBytes);
            System.out.println(message);

            buffer.clear();
            socketChannel.read(buffer, buffer, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            System.err.println("Error reading message");
            System.exit(1);
        }

    };

    static private CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer bytesWritten, Void attachment) {

        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.err.println("Something went wrong");
            System.exit(-1);
        }
    };

    private static AsynchronousSocketChannel socketChannel;

    public static void main(String[] args) {

        try {
            socketChannel = AsynchronousSocketChannel.open();


            //try to connect to the server side
            socketChannel.connect(new InetSocketAddress("localhost", 3002), null
                    , new CompletionHandler<Void, Void>() {
                        @Override
                        public void completed(Void result, Void attachment) {


                            ByteBuffer receivedBuffer = ByteBuffer.allocate(1024);
                            socketChannel.read(receivedBuffer, receivedBuffer, readHandler);


                            Scanner scanner = new Scanner(System.in);
                            System.out.println("Write messages to send to server");
                            ByteBuffer bufferToSend = ByteBuffer.allocate(1024);
                            String line = "";
                            while (!line.equals("exit")) {
                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 1");
                                }
                                line = scanner.nextLine();
                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 2");
                                }
                                byte[] bytesToWrite = line.getBytes();
                                // bufferToSend.clear();
                                bufferToSend.put(bytesToWrite);
                                System.out.println(bufferToSend.limit());
                                bufferToSend.flip();
                                System.out.println(bufferToSend.limit());

                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 3");
                                }
                                if (bufferToSend.limit() != line.length()) {
                                    System.err.println("I am freaking out 4");
                                }

                                socketChannel.write(bufferToSend, null, writeHandler);


                                bufferToSend.limit(bufferToSend.capacity());
                                bufferToSend.position(0);
                                // The two lines above are the same as
                                // bufferToSend.clear(); // Reuse the same buffer, so set pos=0
                                // limit to the capacity which is 1024

                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 5");
                                }


                            }
                        }

                        @Override
                        public void failed(Throwable exc, Void nothing) {
                            System.out.println("Error connection to host");
                        }

                    });

            while (true) {
                try {
                    Thread.sleep(60 * 1000);
                    // Sleep 1 min ... who cares ?
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

How on earth the "I am freaking out" statements are executed? I see no way it could be, the conditions should never be evaluated to true, I made the video where the bug is clearly shown, sometimes at the beginning of the while() loop the position of the buffer is different than 0, but it should not, because at the end of the loop I set it to 0. SURPRISINGLY ENOUGH, this behavior DOES NOT occur, when I make a breakpoint before launching the app, and I trace line by line ... how could it be?
I recorded it, I began with tracing through the debugger, everything worked fine, but once I removed the breaking and let the debugger run, the same code that worked before, now it does not. What am I missing?
The video showing when it worked with tracing is here: https://www.youtube.com/watch?v=0H1OJdZO6AY&feature=youtu.be
If you wanna a server to play with, then this is the one used in the video

package com.melardev.sockets.servers;

import com.melardev.sockets.Constants;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class AsyncTcpEchoServerKey {

    public static void main(String[] args) {

        try {
            // Create new selector
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().setReuseAddress(true);
            // By default this is true, so set it to false for nio sockets
            serverSocketChannel.configureBlocking(false);
            InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
            // Bind to localhost and specified port
            serverSocketChannel.socket().bind(new InetSocketAddress(loopbackAddress, Constants.SOCKET_PORT));


            // ServerSocketChannel only supports OP_ACCEPT (see ServerSocketChannel::validOps())
            // it makes sense, server can only accept sockets
            int operations = SelectionKey.OP_ACCEPT;

            serverSocketChannel.register(selector, operations);
            while (true) {
                if (selector.select() <= 0) {
                    continue;
                }

                try {
                    processReadySet(selector.selectedKeys());
                } catch (IOException e) {
                    continue;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void processReadySet(Set readySet) throws IOException {
        Iterator iterator = readySet.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            // After processing a key, it still persists in the Set, we wanna remove it
            // otherwise we will get it back the next time processReadySet is called
            // We would end up processing the same "event" as many times this method is called
            iterator.remove();
            System.out.printf("isAcceptable %b isConnectable %b isReadable %b isWritable %b\n"
                    , key.isAcceptable(), key.isConnectable(), key.isReadable(), key.isWritable());
            if (key.isAcceptable()) {
                ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();

                // Get the client socket channel
                SocketChannel clientSocketChannel = (SocketChannel) ssChannel.accept();
                // Configure it as non-blocking socket
                clientSocketChannel.configureBlocking(false);
                // Register the socket with the key selector, we want to get notified when we have
                // something to read from socket(OP_READ)
                clientSocketChannel.register(key.selector(), SelectionKey.OP_READ);
            } else if (key.isReadable()) {
                // A Remote client has send us a message
                String message = "nothing";
                // Get the socket who sent the message
                SocketChannel sender = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesCount = 0;
                try {
                    bytesCount = sender.read(buffer);

                    if (bytesCount > 0) {

                        // 1. Get manually
                        message = new String(buffer.array(), 0, bytesCount);

                        // 2. Or, use flip
                        // set buffer.position =0 and buffer.limit = bytesCount
                        buffer.flip();
                        byte[] receivedMessageBytes = new byte[bytesCount];
                        buffer.get(receivedMessageBytes);
                        message = new String(receivedMessageBytes);
                        System.out.println("Receive " + message);
                        // Writing
                        // 1. Easy approach, create a new ByteBuffer and send it
                        // ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());
                        // sender.write(outputBuffer);

                        // 2. Or to reuse the same buffer we could
                        // buffer.limit(buffer.position());
                        // buffer.position(0);

                        // 3. Or the same as point 2, but one line
                        buffer.flip();

                        sender.write(buffer);


                    } else {
                        SocketChannel ssChannel = (SocketChannel) key.channel();
                        ssChannel.close();
                        System.out.println("Client disconnected");
                        break;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    try {
                        sender.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }

}

PD: The first video shows even more than what I have said before, notice that before setting the breaking the console showed I am freaking out 2 and 4, when the breaking was set, I also triggered I am freaking out 1, which at the beginning it wasn't, not only that, but when I resumed the process, this time, I am freaking out 1 was not triggered!!!

2
  • Please - no "youtube videos" :( Commented May 5, 2019 at 23:59
  • Is it forbidden? I just tried to make it easy for you to see the error. Obviously the videos are not monetized ... Commented May 6, 2019 at 0:26

1 Answer 1

2

The documentation of AsynchronousSocketChannel.write() says:

Buffers are not safe for use by multiple concurrent threads so care should be taken to not access the buffer until the operation has completed.

You don't take such care, rather you set bufferToSend's limit and position regardless of the write completion, so the unexpected behavior may be due to this carelessness.

Sign up to request clarification or add additional context in comments.

1 Comment

I was sure the only way this could happen is through a race condition but I did not see how. Well, thanks, now I understand, let me elaborate more your answer for other people. AsynchronousSocketChannel uses its own thread for writing, this means there is one thread(where we read System.in) and other managed by the async socket framework, in our thread we cleared the buffer, but it was not sent yet, when it was sent(in other thread), then the position was updated correspondingly. this is why in our thread at the beginning of the loop we find out position is not 0. Thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.