Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid allocations in every Send by using preallocated byte[] #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions SimplSockets/ProtocolHelper.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace SimplSockets
{
Expand All @@ -22,6 +19,21 @@ public static byte[] AppendControlBytesToMessage(byte[] message, int threadId)
return messageWithControlBytes;
}

public static int AppendControlBytesToMessage(byte[] message, int threadId, byte[] messageWithControlBytes)
{
int iResult = -1;
// Copy control bytes if enought room
if (messageWithControlBytes.Length >= ControlBytesPlaceholder.Length + message.Length)
{
Buffer.BlockCopy(message, 0, messageWithControlBytes, ControlBytesPlaceholder.Length, message.Length);
// Set the control bytes on the message
SetControlBytes(messageWithControlBytes, message.Length, threadId);
iResult = ControlBytesPlaceholder.Length + message.Length;
}

return iResult;
}

public static void SetControlBytes(byte[] buffer, int length, int threadId)
{
// Set little endian message length
Expand Down
64 changes: 39 additions & 25 deletions SimplSockets/SimplSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.Caching;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -108,6 +107,7 @@ public SimplSocketClient(Func<Socket> socketFunc, int messageBufferSize = 65536,
_socketAsyncEventArgsSendPool = new Pool<SocketAsyncEventArgs>(PREDICTED_THREAD_COUNT, () =>
{
var poolItem = new SocketAsyncEventArgs();
poolItem.SetBuffer(new byte[messageBufferSize], 0, messageBufferSize);
poolItem.Completed += OperationCallback;
return poolItem;
});
Expand Down Expand Up @@ -207,12 +207,18 @@ public void Send(byte[] message)

// Get the current thread ID
int threadId = Thread.CurrentThread.ManagedThreadId;

var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId);
int size;

var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop();
socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length);

_sendBufferQueue.Enqueue(socketAsyncEventArgs);
if ((size = ProtocolHelper.AppendControlBytesToMessage(message, threadId, socketAsyncEventArgs.Buffer)) > 0)
{
socketAsyncEventArgs.SetBuffer(0, size);
_sendBufferQueue.Enqueue(socketAsyncEventArgs);
}
else
_socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs);

}

/// <summary>
Expand All @@ -230,37 +236,45 @@ public byte[] SendReceive(byte[] message)

// Get the current thread ID
int threadId = Thread.CurrentThread.ManagedThreadId;
int size;
var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop();

// Enroll in the multiplexer
var multiplexerData = EnrollMultiplexer(threadId);
if ((size = ProtocolHelper.AppendControlBytesToMessage(message, threadId, socketAsyncEventArgs.Buffer)) > 0)
{
socketAsyncEventArgs.SetBuffer(0, size);

var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId);
// Enroll in the multiplexer
var multiplexerData = EnrollMultiplexer(threadId);

var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop();
socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length);
// Prioritize sends that have receives to the front of the queue
_sendBufferQueue.EnqueueFront(socketAsyncEventArgs);

// Prioritize sends that have receives to the front of the queue
_sendBufferQueue.EnqueueFront(socketAsyncEventArgs);
// Wait for our message to go ahead from the receive callback, or until the timeout is reached
if (!multiplexerData.ManualResetEvent.WaitOne(_communicationTimeout))
{
HandleCommunicationError(_socket, new TimeoutException("The connection timed out before the response message was received"));

// Wait for our message to go ahead from the receive callback, or until the timeout is reached
if (!multiplexerData.ManualResetEvent.WaitOne(_communicationTimeout))
{
HandleCommunicationError(_socket, new TimeoutException("The connection timed out before the response message was received"));
// Unenroll from the multiplexer
UnenrollMultiplexer(threadId);

// No signal
return null;
}

// Now get the command string
var result = multiplexerData.Message;

// Unenroll from the multiplexer
UnenrollMultiplexer(threadId);

// No signal
return result;
}
else
{
_socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs);
return null;
}

// Now get the command string
var result = multiplexerData.Message;

// Unenroll from the multiplexer
UnenrollMultiplexer(threadId);

return result;

}

/// <summary>
Expand Down
40 changes: 25 additions & 15 deletions SimplSockets/SimplSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public SimplSocketServer(Func<Socket> socketFunc, int messageBufferSize = 65536,
_socketAsyncEventArgsSendPool = new Pool<SocketAsyncEventArgs>(PREDICTED_CONNECTION_COUNT, () =>
{
var poolItem = new SocketAsyncEventArgs();
poolItem.SetBuffer(new byte[messageBufferSize], 0, messageBufferSize);
poolItem.Completed += OperationCallback;
return poolItem;
});
Expand Down Expand Up @@ -178,8 +179,8 @@ public void Broadcast(byte[] message)

// Get the current thread ID
int threadId = Thread.CurrentThread.ManagedThreadId;

var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId);
int size;
//var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, threadId);

List<Socket> bustedClients = null;

Expand All @@ -190,19 +191,24 @@ public void Broadcast(byte[] message)
foreach (var client in _currentlyConnectedClients)
{
var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop();
socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length);

// Post send on the listening socket
if (!TryUnsafeSocketOperation(client.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs))
if ((size = ProtocolHelper.AppendControlBytesToMessage(message, threadId, socketAsyncEventArgs.Buffer)) > 0)
{
// Mark for disconnection
if (bustedClients == null)
socketAsyncEventArgs.SetBuffer(0, size);
// Post send on the listening socket
if (!TryUnsafeSocketOperation(client.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs))
{
bustedClients = new List<Socket>();
}
// Mark for disconnection
if (bustedClients == null)
{
bustedClients = new List<Socket>();
}

bustedClients.Add(client.Socket);
bustedClients.Add(client.Socket);
}
}
else
_socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs);
}
}
finally
Expand Down Expand Up @@ -236,13 +242,17 @@ public void Reply(byte[] message, ReceivedMessage receivedMessage)
throw new ArgumentException("contains corrupted data", "receivedMessageState");
}

var messageWithControlBytes = ProtocolHelper.AppendControlBytesToMessage(message, receivedMessage.ThreadId);
int size = 0;

var socketAsyncEventArgs = _socketAsyncEventArgsSendPool.Pop();
socketAsyncEventArgs.SetBuffer(messageWithControlBytes, 0, messageWithControlBytes.Length);

// Do the send to the appropriate client
TryUnsafeSocketOperation(receivedMessage.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs);
if ((size = ProtocolHelper.AppendControlBytesToMessage(message, receivedMessage.ThreadId, socketAsyncEventArgs.Buffer)) > 0)
{
socketAsyncEventArgs.SetBuffer(0, size);
// Do the send to the appropriate client
TryUnsafeSocketOperation(receivedMessage.Socket, SocketAsyncOperation.Send, socketAsyncEventArgs);
}
else
_socketAsyncEventArgsSendPool.Push(socketAsyncEventArgs);
}

/// <summary>
Expand Down