7

I have the following requirements for a server/client architecture:

  1. Write a server/client that works asynchronously.

  2. The communication needs to be a duplex, i.e., reads and writes on both ends.

  3. Multiple clients can connect to the server at any given time.

  4. Server/client should wait until they become available and finally make a connection.

  5. Once a client connects it should write to the stream.

  6. Then the server should read from the stream and write response back to the client.

  7. Finally, the client should read the response and the communication should end.

So with the following requirements in mind I've written the following code but I'm not too sure about it because the docs for pipes are somewhat lacking, unfortunately and the code doesn't seems to work correctly, it hangs at a certain point.

namespace PipesAsyncAwait471
{
    using System;
    using System.Collections.Generic;
    using System.IO.Pipes;
    using System.Linq;
    using System.Threading.Tasks;

    internal class Program
    {
        private static async Task Main()
        {
            List<Task> tasks = new List<Task> {
                HandleRequestAsync(),
            };

            tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));

            await Task.WhenAll(tasks);
        }

        private static async Task HandleRequestAsync()
        {
            using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
                                                                            PipeDirection.InOut,
                                                                            NamedPipeServerStream.MaxAllowedServerInstances,
                                                                            PipeTransmissionMode.Message,
                                                                            PipeOptions.Asynchronous))
            {
                Console.WriteLine("Waiting...");

                await server.WaitForConnectionAsync().ConfigureAwait(false);

                if (server.IsConnected)
                {
                    Console.WriteLine("Connected");

                    if (server.CanRead) {
                        // Read something...
                    }

                    if (server.CanWrite) {
                        // Write something... 

                        await server.FlushAsync().ConfigureAwait(false);

                        server.WaitForPipeDrain();
                    }

                    server.Disconnect();

                    await HandleRequestAsync().ConfigureAwait(false);
                }
            }
        }

        private static async Task SendRequestAsync(int index, int counter, int max)
        {
            using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
            {
                await client.ConnectAsync().ConfigureAwait(false);

                if (client.IsConnected)
                {
                    Console.WriteLine($"Index: {index} Counter: {counter}");

                    if (client.CanWrite) {
                        // Write something...

                        await client.FlushAsync().ConfigureAwait(false);

                        client.WaitForPipeDrain();
                    }

                    if (client.CanRead) {
                        // Read something...
                    }
                }

                if (counter <= max) {
                    await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
                }
                else {
                    Console.WriteLine($"{index} Done!");
                }
            }
        }
    }
}

Assumptions:

The way I expect it to work is for all the requests I make when I call SendRequestAsync to execute concurrently where each request then makes additional requests until it reaches 6 and finally, it should print "Done!".

Remarks:

  1. I've tested it on .NET Framework 4.7.1 and .NET Core 2.0 and I get the same results.

  2. The communication between clients and the server is always local to the machine where clients are web applications that can queue some jobs like launching 3rd-party processes and the server is going to be deployed as a Windows service on the same machine as the web server that these clients are deployed on.

15
  • Instead of using pipes you may want to use TCP. See msdn examples : learn.microsoft.com/en-us/dotnet/framework/network-programming/… Commented Jan 2, 2018 at 11:22
  • @jdweng The client and server are processes found on the same machine so TCP would be an overkill for this. Commented Jan 2, 2018 at 11:25
  • Absolutely wrong. Millions of applications use TCP on a local PC. Pipes are used for standard input and standard output but are rarely used to tunnel into an application. Using TCP you can use a sniffer like wireshark or fiddler to debug application. Commented Jan 2, 2018 at 11:28
  • @jdweng TCP is used on local PCs that generally needs to make a remote connection, pipes are used heavily in IPC and I'm using it exactly for that, besides the client and the server there's also 3rd party processes that the server launches where these process have their stdin/out redirected but this isn't related the issue at hand so I didn't bother to go into details about it. Commented Jan 2, 2018 at 11:38
  • 2
    The recursive call to SendRequestAsync() is very ugly. That constantly creates new client pipes, none get closed. The show is probably over when it has created too many connections. Throw this away and use the MSDN sample code as a guide to get this right. Commented Jan 2, 2018 at 12:05

2 Answers 2

7

Here is the complete code after some iterations:

PipeServer.cs:

namespace AsyncPipes;

using System.Diagnostics.CodeAnalysis;
using System.IO.Pipes;

public static class PipeServer
{
    public static void WaitForConnection()
        => WaitForConnectionInitializer();

    private static void WaitForConnectionInitializer()
    {
        var context = new ServerContext();
        var server = context.Server;
    
        try
        {
            Console.WriteLine($"Waiting a client...");

            server.BeginWaitForConnection(WaitForConnectionCallback, context);
        }
        catch
        {
            // We need to cleanup here only when something goes wrong.
            context.Dispose();

            throw;
        }

        static void WaitForConnectionCallback(IAsyncResult result)
        {
            var (context, server, _) = ServerContext.FromResult(result);

            server.EndWaitForConnection(result);

            WaitForConnectionInitializer();

            BeginRead(context);
        }

        static void BeginRead(ServerContext context)
        {
            var (_, server, requestBuffer) = context;
            
            server.BeginRead(requestBuffer, 0, requestBuffer.Length, ReadCallback, context);
        }

        static void BeginWrite(ServerContext context)
        {
            var (_, server, responseBuffer) = context;
            
            server.BeginWrite(responseBuffer, 0, responseBuffer.Length, WriteCallback, context);
        }

        static void ReadCallback(IAsyncResult result)
        {
            var (context, server, requestBuffer) = ServerContext.FromResult(result);

            var bytesRead = server.EndRead(result);

            if (bytesRead > 0)
            {
                if (!server.IsMessageComplete)
                {
                    BeginRead(context);
                }
                else
                {
                    var index = BitConverter.ToInt32(requestBuffer, 0);
                    Console.WriteLine($"{index} Request.");

                    BeginWrite(context);
                }
            }
        }

        static void WriteCallback(IAsyncResult result)
        {
            var (context, server, responseBuffer) = ServerContext.FromResult(result);
            var index = -1;
            
            try
            {
                server.EndWrite(result);
                server.WaitForPipeDrain();
                
                index = BitConverter.ToInt32(responseBuffer, 0);
                Console.WriteLine($"{index} Pong.");
            }
            finally
            {
                context.Dispose();
                Console.WriteLine($"{index} Disposed.");
            }
        }
    }

    private sealed class ServerContext : IDisposable
    {
        [NotNull]
        public byte[]? Buffer { get; private set; } = new byte[4];

        [NotNull]
        public NamedPipeServerStream? Server { get; private set; } = new ("PipesDemo",
                                                                        PipeDirection.InOut,
                                                                        NamedPipeServerStream.MaxAllowedServerInstances,
                                                                        PipeTransmissionMode.Message,
                                                                        PipeOptions.Asynchronous);

        public void Deconstruct(out ServerContext context, out NamedPipeServerStream server, out byte[] buffer)
            => (context, server, buffer) = (this, Server, Buffer);

        public static ServerContext FromResult(IAsyncResult result)
        {
            ArgumentNullException.ThrowIfNull(result.AsyncState);
            
            return (ServerContext)result.AsyncState;
        }
        
        public void Dispose()
        {
            if (Server is not null)
            {
                if (Server.IsConnected)
                {
                    Server.Disconnect();
                }
                
                Server.Dispose();
            }
            
            Server = null;
            Buffer = null;
        }
    }
}

PipeClient:

public static class PipeClient
{
    public static void CreateConnection(int index)
    {
        using var client = new NamedPipeClientStream(".", "PipesDemo", PipeDirection.InOut, PipeOptions.None);
        client.Connect();

        var requestBuffer = BitConverter.GetBytes(index);
        client.Write(requestBuffer, 0, requestBuffer.Length);
        client.Flush();
        client.WaitForPipeDrain();
        Console.WriteLine($"{index} Ping.");

        var responseBuffer = new byte[4];
        var bytesRead = client.Read(responseBuffer, 0, responseBuffer.Length);

        while (bytesRead > 0)
        {
            bytesRead = client.Read(responseBuffer, bytesRead - 1, responseBuffer.Length - bytesRead);
        }

        index = BitConverter.ToInt32(responseBuffer, 0);
        Console.WriteLine($"{index} Response.");
    }
}

Program.cs:

namespace AsyncPipes;

internal class Program
{
    private const int MaxRequests = 1000;

    private static void Main()
    {
        var tasks = new List<Task>
        {
            Task.Run(PipeServer.WaitForConnection)
        };

        tasks.AddRange(Enumerable.Range(0, MaxRequests - 1)
                                .Select(i => Task.Factory.StartNew(() => PipeClient.CreateConnection(i),
                                                                    TaskCreationOptions.LongRunning)));

        Task.WaitAll(tasks.ToArray());

        Console.ReadKey();
    }
}

You can sort the messages and observe the following:

  1. Connections are opened and closed correctly.

  2. Data is sent and received correctly.

  3. Finally, the server still waits for further connections.

Updates:

Changed PipeOptions.Asynchronous to PipeOptions.None otherwise it seems like it hangs for the duration of the requests and only then processing them at once.

PipeOptions.Asynchronous is simply causing a different order of execution than PipeOptions.None, and that's exposing a race condition / deadlock in your code. You can see the effect of it if you use Task Manager, for example, to monitor the thread count of your process... you should see it creeping up at a rate of appx 1 thread per second, until it gets to around 100 threads (maybe 110 or so), at which point your code runs to completion. Or if you add ThreadPool.SetMinThreads(200, 200) at the beginning. Your code has a problem where if the wrong ordering occurs (and that's made more likely by using Asynchronous), you create a cycle where it can't be satisfied until there are enough threads to run all of the concurrent ConnectAsyncs your main method has queued, which aren't truly async and instead just create a work item to invoke the synchronous Connect method (this is unfortunate, and it's issues like this that are one of the reasons I urge folks not to expose async APIs that simply queue works items to call sync methods). Source.

Revised and simplified the example:

  1. There's no true asynchronous Connect method for pipes, ConnectAsync uses Task.Factory.StartNew behind the scene so you might just as well use Connect and then pass the method (SendRequest in our example) that calls the synchronous Connect version to Task.Factory.StartNew.

  2. The server is completely asynchronous now and as far as I can tell it works with no issues.

  3. Fixed all of the BeginXXX/EndXXX methods.

  4. Removed unnecessary try/catch blocks.

  5. Removed unnecessary messages.

  6. Refactor the code a bit to make it more readable and concise.

  7. Removed the async/await version of the server as I refactored the code and didn't have time to update the async/await version but with the above version you can have an idea of how to do it and the new APIs are much more friendly and easy to deal with.

I hope it helps.

Sign up to request clarification or add additional context in comments.

21 Comments

You are not awaiting for BeginRead / BeginWrite to end.
@OwnageIsMagic I'm not sure to what you're referring to exactly but BeginXXX goes in pairs and I do call EndXXX for all of the methods inside the callbacks that are passed that's how this asynchronous pattern works and the example seems to work, a similar code that handles process scheduling still runs with no issues so please can you be more specific and tell me what exactly do you mean? :)
Reading from request buffer before EndRead is wrong. Also this code re-creates pipe handle for each request-response. Also using TaskCreationOptions.LongRunning is wrong because BeginWaitForConnection won't block and exit immediately. Also server.Flush() is blocking, but no-op for PipeStream and WaitForPipeDrain is blocking and can cause lock if one client is not reading from pipe.
@OwnageIsMagic Can you please provide an implementation that works based on your observations? or at least examples that demonstrate the problems? because with all due respect the things you say don't make much sense to me.
@OwnageIsMagic "Reading from request buffer before EndRead is wrong" it was wrong if I had more than 4 bytes in the stream and I had to asynchronously get more data, so in the callback I'd have something like this int bytesRead = server.EndRead(result); var buffer = new byte[bytesRead]; // do something with the buffer. but it's not the case I have a total of 4 bytes which makes an int and that's all the data.
|
4

When disconnecting, WaitForPipeDrain() can throw an IOException due to a broken pipe.

If this happens in your server Task, then it will never listen for the next connection, and all of the remaining client connections hang on ConnectAsync().

If this happens in one of the client Tasks, then it will not continue to recurse and increment the counter for that index.

If you wrap the call to WaitForPipeDrain() in a try/catch, the program will continue running forever, because your function HandleRequestAsync() is infinitely recursive.

In short, to get this to work:

  1. Handle IOException from WaitForPipeDrain()
  2. HandleRequestAsync() has to finish at some point.

2 Comments

I tried to wrap WaitForPipeDrain with a try/catch block but I still get the same results and HandleRequestAsync should run forever so it shouldn't end in order to listen for further requests and if it entered an infinite loop I should have seen the GC doing much work due to allocations made by WaitForConnectionAsync or get StackOverflowException and none of them happens so I dunno. :)
Okay, I do get an exception, I've wrapped the calls with a try/catch and now I can see it.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.