I opened recently one of my asynchronous socket snippets, and I was amazed by the buggy behavior of the following client code, it is a basic PoC client using AsynchronousSocketChannel.
This snippet ideally, it should never reach the "I am freaking out", but it does. Basically the problem is I use a ByteBuffer which at the end of the loop I set its position to 0, at the beginning of the loop I expect it to be 0, but SOMETIMES it is not.
The video showing of the bug is:
https://www.youtube.com/watch?v=bV08SjYutRw&feature=youtu.be
I can solve the problem calling .clear() just after .nextLine() but, I feel curious, what is going on in this innocent snippet?
package com.melardev.sockets.clients;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;
public class AsyncTcpClientCallbacks {
// Horrible demo, I would never write asynchronous sockets this way, I would use attachments
// which allows our code to be cleaner and isolate everything into their own classes
// This is only here to show you how you could do it without attachments, but you have
// to expose the socketChannel so it can be accessible from everywhere, my recommendation is not to bother
// learning, this, go to the demo where I use attachments, it is a lot more readable
static CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesReaded, ByteBuffer buffer) {
buffer.flip();
byte[] receivedBytes = new byte[buffer.limit()];
// Get into receivedBytes
buffer.get(receivedBytes);
String message = new String(receivedBytes);
System.out.println(message);
buffer.clear();
socketChannel.read(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
System.err.println("Error reading message");
System.exit(1);
}
};
static private CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesWritten, Void attachment) {
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Something went wrong");
System.exit(-1);
}
};
private static AsynchronousSocketChannel socketChannel;
public static void main(String[] args) {
try {
socketChannel = AsynchronousSocketChannel.open();
//try to connect to the server side
socketChannel.connect(new InetSocketAddress("localhost", 3002), null
, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
ByteBuffer receivedBuffer = ByteBuffer.allocate(1024);
socketChannel.read(receivedBuffer, receivedBuffer, readHandler);
Scanner scanner = new Scanner(System.in);
System.out.println("Write messages to send to server");
ByteBuffer bufferToSend = ByteBuffer.allocate(1024);
String line = "";
while (!line.equals("exit")) {
if (bufferToSend.position() != 0) {
System.err.println("I am freaking out 1");
}
line = scanner.nextLine();
if (bufferToSend.position() != 0) {
System.err.println("I am freaking out 2");
}
byte[] bytesToWrite = line.getBytes();
// bufferToSend.clear();
bufferToSend.put(bytesToWrite);
System.out.println(bufferToSend.limit());
bufferToSend.flip();
System.out.println(bufferToSend.limit());
if (bufferToSend.position() != 0) {
System.err.println("I am freaking out 3");
}
if (bufferToSend.limit() != line.length()) {
System.err.println("I am freaking out 4");
}
socketChannel.write(bufferToSend, null, writeHandler);
bufferToSend.limit(bufferToSend.capacity());
bufferToSend.position(0);
// The two lines above are the same as
// bufferToSend.clear(); // Reuse the same buffer, so set pos=0
// limit to the capacity which is 1024
if (bufferToSend.position() != 0) {
System.err.println("I am freaking out 5");
}
}
}
@Override
public void failed(Throwable exc, Void nothing) {
System.out.println("Error connection to host");
}
});
while (true) {
try {
Thread.sleep(60 * 1000);
// Sleep 1 min ... who cares ?
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
How on earth the "I am freaking out" statements are executed? I see no way it could be, the conditions should never be evaluated to true, I made the video where the bug is clearly shown, sometimes at the beginning of the while() loop the position of the buffer is different than 0, but it should not, because at the end of the loop I set it to 0.
SURPRISINGLY ENOUGH, this behavior DOES NOT occur, when I make a breakpoint before launching the app, and I trace line by line ... how could it be?
I recorded it, I began with tracing through the debugger, everything worked fine, but once I removed the breaking and let the debugger run, the same code that worked before, now it does not. What am I missing?
The video showing when it worked with tracing is here:
https://www.youtube.com/watch?v=0H1OJdZO6AY&feature=youtu.be
If you wanna a server to play with, then this is the one used in the video
package com.melardev.sockets.servers;
import com.melardev.sockets.Constants;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class AsyncTcpEchoServerKey {
public static void main(String[] args) {
try {
// Create new selector
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
// By default this is true, so set it to false for nio sockets
serverSocketChannel.configureBlocking(false);
InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
// Bind to localhost and specified port
serverSocketChannel.socket().bind(new InetSocketAddress(loopbackAddress, Constants.SOCKET_PORT));
// ServerSocketChannel only supports OP_ACCEPT (see ServerSocketChannel::validOps())
// it makes sense, server can only accept sockets
int operations = SelectionKey.OP_ACCEPT;
serverSocketChannel.register(selector, operations);
while (true) {
if (selector.select() <= 0) {
continue;
}
try {
processReadySet(selector.selectedKeys());
} catch (IOException e) {
continue;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void processReadySet(Set readySet) throws IOException {
Iterator iterator = readySet.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
// After processing a key, it still persists in the Set, we wanna remove it
// otherwise we will get it back the next time processReadySet is called
// We would end up processing the same "event" as many times this method is called
iterator.remove();
System.out.printf("isAcceptable %b isConnectable %b isReadable %b isWritable %b\n"
, key.isAcceptable(), key.isConnectable(), key.isReadable(), key.isWritable());
if (key.isAcceptable()) {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
// Get the client socket channel
SocketChannel clientSocketChannel = (SocketChannel) ssChannel.accept();
// Configure it as non-blocking socket
clientSocketChannel.configureBlocking(false);
// Register the socket with the key selector, we want to get notified when we have
// something to read from socket(OP_READ)
clientSocketChannel.register(key.selector(), SelectionKey.OP_READ);
} else if (key.isReadable()) {
// A Remote client has send us a message
String message = "nothing";
// Get the socket who sent the message
SocketChannel sender = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesCount = 0;
try {
bytesCount = sender.read(buffer);
if (bytesCount > 0) {
// 1. Get manually
message = new String(buffer.array(), 0, bytesCount);
// 2. Or, use flip
// set buffer.position =0 and buffer.limit = bytesCount
buffer.flip();
byte[] receivedMessageBytes = new byte[bytesCount];
buffer.get(receivedMessageBytes);
message = new String(receivedMessageBytes);
System.out.println("Receive " + message);
// Writing
// 1. Easy approach, create a new ByteBuffer and send it
// ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());
// sender.write(outputBuffer);
// 2. Or to reuse the same buffer we could
// buffer.limit(buffer.position());
// buffer.position(0);
// 3. Or the same as point 2, but one line
buffer.flip();
sender.write(buffer);
} else {
SocketChannel ssChannel = (SocketChannel) key.channel();
ssChannel.close();
System.out.println("Client disconnected");
break;
}
} catch (IOException e) {
e.printStackTrace();
try {
sender.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
PD: The first video shows even more than what I have said before, notice that before setting the breaking the console showed I am freaking out 2 and 4, when the breaking was set, I also triggered I am freaking out 1, which at the beginning it wasn't, not only that, but when I resumed the process, this time, I am freaking out 1 was not triggered!!!