diff --git a/SimplSockets/ProtocolHelper.cs b/SimplSockets/ProtocolHelper.cs index 558950c..9f889d4 100644 --- a/SimplSockets/ProtocolHelper.cs +++ b/SimplSockets/ProtocolHelper.cs @@ -1,7 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; namespace SimplSockets { @@ -22,6 +19,21 @@ public static byte[] AppendControlBytesToMessage(byte[] message, int threadId) return messageWithControlBytes; } + public static int AppendControlBytesToMessage(byte[] message, int threadId, byte[] messageWithControlBytes) + { + int iResult = -1; + // Copy control bytes if enought room + if (messageWithControlBytes.Length >= ControlBytesPlaceholder.Length + message.Length) + { + Buffer.BlockCopy(message, 0, messageWithControlBytes, ControlBytesPlaceholder.Length, message.Length); + // Set the control bytes on the message + SetControlBytes(messageWithControlBytes, message.Length, threadId); + iResult = ControlBytesPlaceholder.Length + message.Length; + } + + return iResult; + } + public static void SetControlBytes(byte[] buffer, int length, int threadId) { // Set little endian message length diff --git a/SimplSockets/SimplSocketClient.cs b/SimplSockets/SimplSocketClient.cs index 48a9152..1bfb159 100644 --- a/SimplSockets/SimplSocketClient.cs +++ b/SimplSockets/SimplSocketClient.cs @@ -3,7 +3,6 @@ using System.IO; using System.Net; using System.Net.Sockets; -using System.Runtime.Caching; using System.Threading; using System.Threading.Tasks; @@ -108,6 +107,7 @@ public SimplSocketClient(Func socketFunc, int messageBufferSize = 65536, _socketAsyncEventArgsSendPool = new Pool(PREDICTED_THREAD_COUNT, () => { var poolItem = new SocketAsyncEventArgs(); + poolItem.SetBuffer(new byte[messageBufferSize], 0, messageBufferSize); poolItem.Completed += OperationCallback; return poolItem; }); @@ -207,12 +207,18 @@ public void Send(byte[] message) // Get the current thread ID int threadId = Thread.CurrentThread.ManagedThreadId; - - var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId); + int size; + var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop(); - socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length); - _sendBufferQueue.Enqueue(socketAsyncEventArgs); + if ((size = ProtocolHelper.AppendControlBytesToMessage(message, threadId, socketAsyncEventArgs.Buffer)) > 0) + { + socketAsyncEventArgs.SetBuffer(0, size); + _sendBufferQueue.Enqueue(socketAsyncEventArgs); + } + else + _socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs); + } /// @@ -230,37 +236,45 @@ public byte[] SendReceive(byte[] message) // Get the current thread ID int threadId = Thread.CurrentThread.ManagedThreadId; + int size; + var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop(); - // Enroll in the multiplexer - var multiplexerData = EnrollMultiplexer(threadId); + if ((size = ProtocolHelper.AppendControlBytesToMessage(message, threadId, socketAsyncEventArgs.Buffer)) > 0) + { + socketAsyncEventArgs.SetBuffer(0, size); - var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId); + // Enroll in the multiplexer + var multiplexerData = EnrollMultiplexer(threadId); - var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop(); - socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length); + // Prioritize sends that have receives to the front of the queue + _sendBufferQueue.EnqueueFront(socketAsyncEventArgs); - // Prioritize sends that have receives to the front of the queue - _sendBufferQueue.EnqueueFront(socketAsyncEventArgs); + // Wait for our message to go ahead from the receive callback, or until the timeout is reached + if (!multiplexerData.ManualResetEvent.WaitOne(_communicationTimeout)) + { + HandleCommunicationError(_socket, new TimeoutException("The connection timed out before the response message was received")); - // Wait for our message to go ahead from the receive callback, or until the timeout is reached - if (!multiplexerData.ManualResetEvent.WaitOne(_communicationTimeout)) - { - HandleCommunicationError(_socket, new TimeoutException("The connection timed out before the response message was received")); + // Unenroll from the multiplexer + UnenrollMultiplexer(threadId); + + // No signal + return null; + } + + // Now get the command string + var result = multiplexerData.Message; // Unenroll from the multiplexer UnenrollMultiplexer(threadId); - // No signal + return result; + } + else + { + _socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs); return null; } - - // Now get the command string - var result = multiplexerData.Message; - - // Unenroll from the multiplexer - UnenrollMultiplexer(threadId); - - return result; + } /// diff --git a/SimplSockets/SimplSocketServer.cs b/SimplSockets/SimplSocketServer.cs index ebe5b97..f638b43 100644 --- a/SimplSockets/SimplSocketServer.cs +++ b/SimplSockets/SimplSocketServer.cs @@ -92,6 +92,7 @@ public SimplSocketServer(Func socketFunc, int messageBufferSize = 65536, _socketAsyncEventArgsSendPool = new Pool(PREDICTED_CONNECTION_COUNT, () => { var poolItem = new SocketAsyncEventArgs(); + poolItem.SetBuffer(new byte[messageBufferSize], 0, messageBufferSize); poolItem.Completed += OperationCallback; return poolItem; }); @@ -178,8 +179,8 @@ public void Broadcast(byte[] message) // Get the current thread ID int threadId = Thread.CurrentThread.ManagedThreadId; - - var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId); + int size; + //var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId); List bustedClients = null; @@ -190,19 +191,24 @@ public void Broadcast(byte[] message) foreach (var client in _currentlyConnectedClients) { var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop(); - socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length); - // Post send on the listening socket - if (!TryUnsafeSocketOperation(client.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs)) + if ((size = ProtocolHelper.AppendControlBytesToMessage(message, threadId, socketAsyncEventArgs.Buffer)) > 0) { - // Mark for disconnection - if (bustedClients == null) + socketAsyncEventArgs.SetBuffer(0, size); + // Post send on the listening socket + if (!TryUnsafeSocketOperation(client.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs)) { - bustedClients = new List(); - } + // Mark for disconnection + if (bustedClients == null) + { + bustedClients = new List(); + } - bustedClients.Add(client.Socket); + bustedClients.Add(client.Socket); + } } + else + _socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs); } } finally @@ -236,13 +242,17 @@ public void Reply(byte[] message, ReceivedMessage receivedMessage) throw new ArgumentException("contains corrupted data", "receivedMessageState"); } - var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, receivedMessage.ThreadId); + int size = 0; var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop(); - socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length); - - // Do the send to the appropriate client - TryUnsafeSocketOperation(receivedMessage.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs); + if ((size = ProtocolHelper.AppendControlBytesToMessage(message, receivedMessage.ThreadId, socketAsyncEventArgs.Buffer)) > 0) + { + socketAsyncEventArgs.SetBuffer(0, size); + // Do the send to the appropriate client + TryUnsafeSocketOperation(receivedMessage.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs); + } + else + _socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs); } ///