KCP Transport
This commit is contained in:
parent
c892d2745a
commit
99163d7d4e
14 changed files with 2670 additions and 0 deletions
|
|
@ -0,0 +1,25 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
class KCPConfig
|
||||
{
|
||||
public bool NoDelay = true;
|
||||
|
||||
public uint Interval = 10;
|
||||
|
||||
public int FastResend = 2;
|
||||
|
||||
public bool CongestionWindow = false; // KCP 'NoCongestionWindow' is false by default. here we negate it for ease of use.
|
||||
|
||||
public uint SendWindowSize = 4096; //Kcp.WND_SND; 32 by default. Mirror sends a lot, so we need a lot more.
|
||||
|
||||
public uint ReceiveWindowSize = 4096; //Kcp.WND_RCV; 128 by default. Mirror sends a lot, so we need a lot more.
|
||||
|
||||
public int ConnectionTimeout = 10000; // Time in miliseconds it takes for a connection to time out.
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,237 @@
|
|||
//#if MIRROR <- commented out because MIRROR isn't defined on first import yet
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using Mirror;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public class KcpTransport : Transport
|
||||
{
|
||||
// scheme used by this transport
|
||||
public const string Scheme = "kcp";
|
||||
|
||||
// common
|
||||
public static int ConnectionTimeout = 10000;
|
||||
|
||||
public bool NoDelay = true;
|
||||
|
||||
public uint Interval = 10;
|
||||
|
||||
public int FastResend = 2;
|
||||
|
||||
public bool CongestionWindow = false; // KCP 'NoCongestionWindow' is false by default. here we negate it for ease of use.
|
||||
|
||||
public uint SendWindowSize = 4096; //Kcp.WND_SND; 32 by default. Mirror sends a lot, so we need a lot more.
|
||||
|
||||
public uint ReceiveWindowSize = 4096; //Kcp.WND_RCV; 128 by default. Mirror sends a lot, so we need a lot more.
|
||||
|
||||
// server & client
|
||||
KcpServer server;
|
||||
KcpClient client;
|
||||
|
||||
// debugging
|
||||
public bool debugLog;
|
||||
// show statistics in OnGUI
|
||||
public bool statisticsGUI;
|
||||
// log statistics for headless servers that can't show them in GUI
|
||||
public bool statisticsLog;
|
||||
|
||||
void Awake()
|
||||
{
|
||||
|
||||
KCPConfig conf = new KCPConfig();
|
||||
if (!File.Exists("KCPConfig.json"))
|
||||
{
|
||||
File.WriteAllText("KCPConfig.json", JsonConvert.SerializeObject(conf, Formatting.Indented));
|
||||
}
|
||||
else
|
||||
{
|
||||
conf = JsonConvert.DeserializeObject<KCPConfig>(File.ReadAllText("KCPConfig.json"));
|
||||
}
|
||||
|
||||
NoDelay = conf.NoDelay;
|
||||
Interval = conf.Interval;
|
||||
FastResend = conf.FastResend;
|
||||
CongestionWindow = conf.CongestionWindow;
|
||||
SendWindowSize = conf.SendWindowSize;
|
||||
ReceiveWindowSize = conf.ReceiveWindowSize;
|
||||
ConnectionTimeout = conf.ConnectionTimeout;
|
||||
|
||||
// logging
|
||||
// Log.Info should use Debug.Log if enabled, or nothing otherwise
|
||||
// (don't want to spam the console on headless servers)
|
||||
if (debugLog)
|
||||
Log.Info = Console.WriteLine;
|
||||
else
|
||||
Log.Info = _ => { };
|
||||
Log.Warning = Console.WriteLine;
|
||||
Log.Error = Console.WriteLine;
|
||||
|
||||
// client
|
||||
client = new KcpClient(
|
||||
() => OnClientConnected.Invoke(),
|
||||
(message) => OnClientDataReceived.Invoke(message, 0),
|
||||
() => OnClientDisconnected.Invoke()
|
||||
);
|
||||
|
||||
// server
|
||||
server = new KcpServer(
|
||||
(connectionId) => OnServerConnected.Invoke(connectionId),
|
||||
(connectionId, message) => OnServerDataReceived.Invoke(connectionId, message, 0),
|
||||
(connectionId) => OnServerDisconnected.Invoke(connectionId),
|
||||
NoDelay,
|
||||
Interval,
|
||||
FastResend,
|
||||
CongestionWindow,
|
||||
SendWindowSize,
|
||||
ReceiveWindowSize
|
||||
);
|
||||
|
||||
|
||||
Console.WriteLine("KcpTransport initialized!");
|
||||
}
|
||||
|
||||
// all except WebGL
|
||||
public override bool Available() => true;
|
||||
|
||||
// client
|
||||
public override bool ClientConnected() => client.connected;
|
||||
public override void ClientConnect(string address) { }
|
||||
public override void ClientSend(int channelId, ArraySegment<byte> segment)
|
||||
{
|
||||
// switch to kcp channel.
|
||||
// unreliable or reliable.
|
||||
// default to reliable just to be sure.
|
||||
switch (channelId)
|
||||
{
|
||||
case 1:
|
||||
client.Send(segment, KcpChannel.Unreliable);
|
||||
break;
|
||||
default:
|
||||
client.Send(segment, KcpChannel.Reliable);
|
||||
break;
|
||||
}
|
||||
}
|
||||
public override void ClientDisconnect() => client.Disconnect();
|
||||
|
||||
// scene change message will disable transports.
|
||||
// kcp processes messages in an internal loop which should be
|
||||
// stopped immediately after scene change (= after disabled)
|
||||
// => kcp has tests to guaranteed that calling .Pause() during the
|
||||
// receive loop stops the receive loop immediately, not after.
|
||||
void OnEnable()
|
||||
{
|
||||
// unpause when enabled again
|
||||
client?.Unpause();
|
||||
server?.Unpause();
|
||||
}
|
||||
|
||||
void OnDisable()
|
||||
{
|
||||
// pause immediately when not enabled anymore
|
||||
client?.Pause();
|
||||
server?.Pause();
|
||||
}
|
||||
|
||||
// server
|
||||
public override Uri ServerUri()
|
||||
{
|
||||
UriBuilder builder = new UriBuilder();
|
||||
builder.Scheme = Scheme;
|
||||
builder.Host = Dns.GetHostName();
|
||||
return builder.Uri;
|
||||
}
|
||||
public override bool ServerActive() => server.IsActive();
|
||||
public override void ServerStart(ushort requestedPort) => server.Start(requestedPort);
|
||||
public override void ServerSend(int connectionId, int channelId, ArraySegment<byte> segment)
|
||||
{
|
||||
// switch to kcp channel.
|
||||
// unreliable or reliable.
|
||||
// default to reliable just to be sure.
|
||||
switch (channelId)
|
||||
{
|
||||
case 1:
|
||||
server.Send(connectionId, segment, KcpChannel.Unreliable);
|
||||
break;
|
||||
default:
|
||||
server.Send(connectionId, segment, KcpChannel.Reliable);
|
||||
break;
|
||||
}
|
||||
}
|
||||
public override bool ServerDisconnect(int connectionId)
|
||||
{
|
||||
server.Disconnect(connectionId);
|
||||
return true;
|
||||
}
|
||||
public override string ServerGetClientAddress(int connectionId) => server.GetClientAddress(connectionId);
|
||||
public override void ServerStop() => server.Stop();
|
||||
|
||||
public void Update()
|
||||
{
|
||||
server.TickIncoming();
|
||||
server.TickOutgoing();
|
||||
}
|
||||
|
||||
// common
|
||||
public override void Shutdown() {}
|
||||
|
||||
// max message size
|
||||
public override int GetMaxPacketSize(int channelId = 0)
|
||||
{
|
||||
// switch to kcp channel.
|
||||
// unreliable or reliable.
|
||||
// default to reliable just to be sure.
|
||||
switch (channelId)
|
||||
{
|
||||
case 1:
|
||||
return KcpConnection.UnreliableMaxMessageSize;
|
||||
default:
|
||||
return KcpConnection.ReliableMaxMessageSize;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// server statistics
|
||||
public int GetAverageMaxSendRate() =>
|
||||
server.connections.Count > 0
|
||||
? server.connections.Values.Sum(conn => (int)conn.MaxSendRate) / server.connections.Count
|
||||
: 0;
|
||||
public int GetAverageMaxReceiveRate() =>
|
||||
server.connections.Count > 0
|
||||
? server.connections.Values.Sum(conn => (int)conn.MaxReceiveRate) / server.connections.Count
|
||||
: 0;
|
||||
int GetTotalSendQueue() =>
|
||||
server.connections.Values.Sum(conn => conn.SendQueueCount);
|
||||
int GetTotalReceiveQueue() =>
|
||||
server.connections.Values.Sum(conn => conn.ReceiveQueueCount);
|
||||
int GetTotalSendBuffer() =>
|
||||
server.connections.Values.Sum(conn => conn.SendBufferCount);
|
||||
int GetTotalReceiveBuffer() =>
|
||||
server.connections.Values.Sum(conn => conn.ReceiveBufferCount);
|
||||
|
||||
// PrettyBytes function from DOTSNET
|
||||
// pretty prints bytes as KB/MB/GB/etc.
|
||||
// long to support > 2GB
|
||||
// divides by floats to return "2.5MB" etc.
|
||||
public static string PrettyBytes(long bytes)
|
||||
{
|
||||
// bytes
|
||||
if (bytes < 1024)
|
||||
return $"{bytes} B";
|
||||
// kilobytes
|
||||
else if (bytes < 1024L * 1024L)
|
||||
return $"{(bytes / 1024f):F2} KB";
|
||||
// megabytes
|
||||
else if (bytes < 1024 * 1024L * 1024L)
|
||||
return $"{(bytes / (1024f * 1024f)):F2} MB";
|
||||
// gigabytes
|
||||
return $"{(bytes / (1024f * 1024f * 1024f)):F2} GB";
|
||||
}
|
||||
|
||||
public override string ToString() => "KCP";
|
||||
}
|
||||
}
|
||||
//#endif MIRROR <- commented out because MIRROR isn't defined on first import yet
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
namespace kcp2k
|
||||
{
|
||||
// channel type and header for raw messages
|
||||
public enum KcpChannel : byte
|
||||
{
|
||||
// don't react on 0x00. might help to filter out random noise.
|
||||
Reliable = 0x01,
|
||||
Unreliable = 0x02
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
// kcp client logic abstracted into a class.
|
||||
// for use in Mirror, DOTSNET, testing, etc.
|
||||
using System;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public class KcpClient
|
||||
{
|
||||
// events
|
||||
public Action OnConnected;
|
||||
public Action<ArraySegment<byte>> OnData;
|
||||
public Action OnDisconnected;
|
||||
|
||||
// state
|
||||
public KcpClientConnection connection;
|
||||
public bool connected;
|
||||
|
||||
public KcpClient(Action OnConnected, Action<ArraySegment<byte>> OnData, Action OnDisconnected)
|
||||
{
|
||||
this.OnConnected = OnConnected;
|
||||
this.OnData = OnData;
|
||||
this.OnDisconnected = OnDisconnected;
|
||||
}
|
||||
|
||||
public void Connect(string address, ushort port, bool noDelay, uint interval, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
|
||||
{
|
||||
if (connected)
|
||||
{
|
||||
Log.Warning("KCP: client already connected!");
|
||||
return;
|
||||
}
|
||||
|
||||
connection = new KcpClientConnection();
|
||||
|
||||
// setup events
|
||||
connection.OnAuthenticated = () =>
|
||||
{
|
||||
Log.Info($"KCP: OnClientConnected");
|
||||
connected = true;
|
||||
OnConnected.Invoke();
|
||||
};
|
||||
connection.OnData = (message) =>
|
||||
{
|
||||
//Log.Debug($"KCP: OnClientData({BitConverter.ToString(message.Array, message.Offset, message.Count)})");
|
||||
OnData.Invoke(message);
|
||||
};
|
||||
connection.OnDisconnected = () =>
|
||||
{
|
||||
Log.Info($"KCP: OnClientDisconnected");
|
||||
connected = false;
|
||||
connection = null;
|
||||
OnDisconnected.Invoke();
|
||||
};
|
||||
|
||||
// connect
|
||||
connection.Connect(address, port, noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
|
||||
}
|
||||
|
||||
public void Send(ArraySegment<byte> segment, KcpChannel channel)
|
||||
{
|
||||
if (connected)
|
||||
{
|
||||
connection.SendData(segment, channel);
|
||||
}
|
||||
else Log.Warning("KCP: can't send because client not connected!");
|
||||
}
|
||||
|
||||
public void Disconnect()
|
||||
{
|
||||
// only if connected
|
||||
// otherwise we end up in a deadlock because of an open Mirror bug:
|
||||
// https://github.com/vis2k/Mirror/issues/2353
|
||||
if (connected)
|
||||
{
|
||||
// call Disconnect and let the connection handle it.
|
||||
// DO NOT set it to null yet. it needs to be updated a few more
|
||||
// times first. let the connection handle it!
|
||||
connection?.Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// process incoming messages. should be called before updating the world.
|
||||
public void TickIncoming()
|
||||
{
|
||||
// recv on socket first, then process incoming
|
||||
// (even if we didn't receive anything. need to tick ping etc.)
|
||||
// (connection is null if not active)
|
||||
connection?.RawReceive();
|
||||
connection?.TickIncoming();
|
||||
}
|
||||
|
||||
// process outgoing messages. should be called after updating the world.
|
||||
public void TickOutgoing()
|
||||
{
|
||||
// process outgoing
|
||||
// (connection is null if not active)
|
||||
connection?.TickOutgoing();
|
||||
}
|
||||
|
||||
// process incoming and outgoing for convenience
|
||||
// => ideally call ProcessIncoming() before updating the world and
|
||||
// ProcessOutgoing() after updating the world for minimum latency
|
||||
public void Tick()
|
||||
{
|
||||
TickIncoming();
|
||||
TickOutgoing();
|
||||
}
|
||||
|
||||
// pause/unpause to safely support mirror scene handling and to
|
||||
// immediately pause the receive while loop if needed.
|
||||
public void Pause() => connection?.Pause();
|
||||
public void Unpause() => connection?.Unpause();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public class KcpClientConnection : KcpConnection
|
||||
{
|
||||
// IMPORTANT: raw receive buffer always needs to be of 'MTU' size, even
|
||||
// if MaxMessageSize is larger. kcp always sends in MTU
|
||||
// segments and having a buffer smaller than MTU would
|
||||
// silently drop excess data.
|
||||
// => we need the MTU to fit channel + message!
|
||||
readonly byte[] rawReceiveBuffer = new byte[Kcp.MTU_DEF];
|
||||
|
||||
public void Connect(string host, ushort port, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
|
||||
{
|
||||
Log.Info($"KcpClient: connect to {host}:{port}");
|
||||
IPAddress[] ipAddress = Dns.GetHostAddresses(host);
|
||||
if (ipAddress.Length < 1)
|
||||
throw new SocketException((int)SocketError.HostNotFound);
|
||||
|
||||
remoteEndpoint = new IPEndPoint(ipAddress[0], port);
|
||||
socket = new Socket(remoteEndpoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
|
||||
socket.Connect(remoteEndpoint);
|
||||
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
|
||||
|
||||
// client should send handshake to server as very first message
|
||||
SendHandshake();
|
||||
|
||||
RawReceive();
|
||||
}
|
||||
|
||||
// call from transport update
|
||||
public void RawReceive()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (socket != null)
|
||||
{
|
||||
while (socket.Poll(0, SelectMode.SelectRead))
|
||||
{
|
||||
int msgLength = socket.ReceiveFrom(rawReceiveBuffer, ref remoteEndpoint);
|
||||
// IMPORTANT: detect if buffer was too small for the
|
||||
// received msgLength. otherwise the excess
|
||||
// data would be silently lost.
|
||||
// (see ReceiveFrom documentation)
|
||||
if (msgLength <= rawReceiveBuffer.Length)
|
||||
{
|
||||
//Log.Debug($"KCP: client raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
|
||||
RawInput(rawReceiveBuffer, msgLength);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.Error($"KCP ClientConnection: message of size {msgLength} does not fit into buffer of size {rawReceiveBuffer.Length}. The excess was silently dropped. Disconnecting.");
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// this is fine, the socket might have been closed in the other end
|
||||
catch (SocketException) {}
|
||||
}
|
||||
|
||||
protected override void Dispose()
|
||||
{
|
||||
socket.Close();
|
||||
socket = null;
|
||||
}
|
||||
|
||||
protected override void RawSend(byte[] data, int length)
|
||||
{
|
||||
socket.Send(data, length, SocketFlags.None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,667 @@
|
|||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
enum KcpState { Connected, Authenticated, Disconnected }
|
||||
|
||||
public abstract class KcpConnection
|
||||
{
|
||||
protected Socket socket;
|
||||
protected EndPoint remoteEndpoint;
|
||||
internal Kcp kcp;
|
||||
|
||||
// kcp can have several different states, let's use a state machine
|
||||
KcpState state = KcpState.Disconnected;
|
||||
|
||||
public Action OnAuthenticated;
|
||||
public Action<ArraySegment<byte>> OnData;
|
||||
public Action OnDisconnected;
|
||||
|
||||
// Mirror needs a way to stop the kcp message processing while loop
|
||||
// immediately after a scene change message. Mirror can't process any
|
||||
// other messages during a scene change.
|
||||
// (could be useful for others too)
|
||||
bool paused;
|
||||
|
||||
uint lastReceiveTime;
|
||||
|
||||
// internal time.
|
||||
// StopWatch offers ElapsedMilliSeconds and should be more precise than
|
||||
// Unity's time.deltaTime over long periods.
|
||||
readonly Stopwatch refTime = new Stopwatch();
|
||||
|
||||
// we need to subtract the channel byte from every MaxMessageSize
|
||||
// calculation.
|
||||
// we also need to tell kcp to use MTU-1 to leave space for the byte.
|
||||
const int CHANNEL_HEADER_SIZE = 1;
|
||||
|
||||
// reliable channel (= kcp) MaxMessageSize so the outside knows largest
|
||||
// allowed message to send the calculation in Send() is not obvious at
|
||||
// all, so let's provide the helper here.
|
||||
//
|
||||
// kcp does fragmentation, so max message is way larger than MTU.
|
||||
//
|
||||
// -> runtime MTU changes are disabled: mss is always MTU_DEF-OVERHEAD
|
||||
// -> Send() checks if fragment count < WND_RCV, so we use WND_RCV - 1.
|
||||
// note that Send() checks WND_RCV instead of wnd_rcv which may or
|
||||
// may not be a bug in original kcp. but since it uses the define, we
|
||||
// can use that here too.
|
||||
// -> we add 1 byte KcpHeader enum to each message, so -1
|
||||
//
|
||||
// IMPORTANT: max message is MTU * WND_RCV, in other words it completely
|
||||
// fills the receive window! due to head of line blocking,
|
||||
// all other messages have to wait while a maxed size message
|
||||
// is being delivered.
|
||||
// => in other words, DO NOT use max size all the time like
|
||||
// for batching.
|
||||
// => sending UNRELIABLE max message size most of the time is
|
||||
// best for performance (use that one for batching!)
|
||||
public const int ReliableMaxMessageSize = (Kcp.MTU_DEF - Kcp.OVERHEAD - CHANNEL_HEADER_SIZE) * (Kcp.WND_RCV - 1) - 1;
|
||||
|
||||
// unreliable max message size is simply MTU - channel header size
|
||||
public const int UnreliableMaxMessageSize = Kcp.MTU_DEF - CHANNEL_HEADER_SIZE;
|
||||
|
||||
// buffer to receive kcp's processed messages (avoids allocations).
|
||||
// IMPORTANT: this is for KCP messages. so it needs to be of size:
|
||||
// 1 byte header + MaxMessageSize content
|
||||
byte[] kcpMessageBuffer = new byte[1 + ReliableMaxMessageSize];
|
||||
|
||||
// send buffer for handing user messages to kcp for processing.
|
||||
// (avoids allocations).
|
||||
// IMPORTANT: needs to be of size:
|
||||
// 1 byte header + MaxMessageSize content
|
||||
byte[] kcpSendBuffer = new byte[1 + ReliableMaxMessageSize];
|
||||
|
||||
// raw send buffer is exactly MTU.
|
||||
byte[] rawSendBuffer = new byte[Kcp.MTU_DEF];
|
||||
|
||||
// send a ping occasionally so we don't time out on the other end.
|
||||
// for example, creating a character in an MMO could easily take a
|
||||
// minute of no data being sent. which doesn't mean we want to time out.
|
||||
// same goes for slow paced card games etc.
|
||||
public const int PING_INTERVAL = 1000;
|
||||
uint lastPingTime;
|
||||
|
||||
// if we send more than kcp can handle, we will get ever growing
|
||||
// send/recv buffers and queues and minutes of latency.
|
||||
// => if a connection can't keep up, it should be disconnected instead
|
||||
// to protect the server under heavy load, and because there is no
|
||||
// point in growing to gigabytes of memory or minutes of latency!
|
||||
// => 2k isn't enough. we reach 2k when spawning 4k monsters at once
|
||||
// easily, but it does recover over time.
|
||||
// => 10k seems safe.
|
||||
//
|
||||
// note: we have a ChokeConnectionAutoDisconnects test for this too!
|
||||
internal const int QueueDisconnectThreshold = 10000;
|
||||
|
||||
// getters for queue and buffer counts, used for debug info
|
||||
public int SendQueueCount => kcp.snd_queue.Count;
|
||||
public int ReceiveQueueCount => kcp.rcv_queue.Count;
|
||||
public int SendBufferCount => kcp.snd_buf.Count;
|
||||
public int ReceiveBufferCount => kcp.rcv_buf.Count;
|
||||
|
||||
// maximum send rate per second can be calculated from kcp parameters
|
||||
// source: https://translate.google.com/translate?sl=auto&tl=en&u=https://wetest.qq.com/lab/view/391.html
|
||||
//
|
||||
// KCP can send/receive a maximum of WND*MTU per interval.
|
||||
// multiple by 1000ms / interval to get the per-second rate.
|
||||
//
|
||||
// example:
|
||||
// WND(32) * MTU(1400) = 43.75KB
|
||||
// => 43.75KB * 1000 / INTERVAL(10) = 4375KB/s
|
||||
//
|
||||
// returns bytes/second!
|
||||
public uint MaxSendRate =>
|
||||
kcp.snd_wnd * kcp.mtu * 1000 / kcp.interval;
|
||||
|
||||
public uint MaxReceiveRate =>
|
||||
kcp.rcv_wnd * kcp.mtu * 1000 / kcp.interval;
|
||||
|
||||
// NoDelay, interval, window size are the most important configurations.
|
||||
// let's force require the parameters so we don't forget it anywhere.
|
||||
protected void SetupKcp(bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
|
||||
{
|
||||
// set up kcp over reliable channel (that's what kcp is for)
|
||||
kcp = new Kcp(0, RawSendReliable);
|
||||
// set nodelay.
|
||||
// note that kcp uses 'nocwnd' internally so we negate the parameter
|
||||
kcp.SetNoDelay(noDelay ? 1u : 0u, interval, fastResend, !congestionWindow);
|
||||
kcp.SetWindowSize(sendWindowSize, receiveWindowSize);
|
||||
|
||||
// IMPORTANT: high level needs to add 1 channel byte to each raw
|
||||
// message. so while Kcp.MTU_DEF is perfect, we actually need to
|
||||
// tell kcp to use MTU-1 so we can still put the header into the
|
||||
// message afterwards.
|
||||
kcp.SetMtu(Kcp.MTU_DEF - CHANNEL_HEADER_SIZE);
|
||||
|
||||
state = KcpState.Connected;
|
||||
|
||||
refTime.Start();
|
||||
}
|
||||
|
||||
void HandleTimeout(uint time)
|
||||
{
|
||||
// note: we are also sending a ping regularly, so timeout should
|
||||
// only ever happen if the connection is truly gone.
|
||||
if (time >= lastReceiveTime + KcpTransport.ConnectionTimeout)
|
||||
{
|
||||
Log.Warning($"KCP: Connection timed out after not receiving any message for {KcpTransport.ConnectionTimeout}ms. Disconnecting.");
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
void HandleDeadLink()
|
||||
{
|
||||
// kcp has 'dead_link' detection. might as well use it.
|
||||
if (kcp.state == -1)
|
||||
{
|
||||
Log.Warning("KCP Connection dead_link detected. Disconnecting.");
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// send a ping occasionally in order to not time out on the other end.
|
||||
void HandlePing(uint time)
|
||||
{
|
||||
// enough time elapsed since last ping?
|
||||
if (time >= lastPingTime + PING_INTERVAL)
|
||||
{
|
||||
// ping again and reset time
|
||||
//Log.Debug("KCP: sending ping...");
|
||||
SendPing();
|
||||
lastPingTime = time;
|
||||
}
|
||||
}
|
||||
|
||||
void HandleChoked()
|
||||
{
|
||||
// disconnect connections that can't process the load.
|
||||
// see QueueSizeDisconnect comments.
|
||||
// => include all of kcp's buffers and the unreliable queue!
|
||||
int total = kcp.rcv_queue.Count + kcp.snd_queue.Count +
|
||||
kcp.rcv_buf.Count + kcp.snd_buf.Count;
|
||||
if (total >= QueueDisconnectThreshold)
|
||||
{
|
||||
Log.Warning($"KCP: disconnecting connection because it can't process data fast enough.\n" +
|
||||
$"Queue total {total}>{QueueDisconnectThreshold}. rcv_queue={kcp.rcv_queue.Count} snd_queue={kcp.snd_queue.Count} rcv_buf={kcp.rcv_buf.Count} snd_buf={kcp.snd_buf.Count}\n" +
|
||||
$"* Try to Enable NoDelay, decrease INTERVAL, disable Congestion Window (= enable NOCWND!), increase SEND/RECV WINDOW or compress data.\n" +
|
||||
$"* Or perhaps the network is simply too slow on our end, or on the other end.\n");
|
||||
|
||||
// let's clear all pending sends before disconnting with 'Bye'.
|
||||
// otherwise a single Flush in Disconnect() won't be enough to
|
||||
// flush thousands of messages to finally deliver 'Bye'.
|
||||
// this is just faster and more robust.
|
||||
kcp.snd_queue.Clear();
|
||||
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// reads the next reliable message type & content from kcp.
|
||||
// -> to avoid buffering, unreliable messages call OnData directly.
|
||||
bool ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message)
|
||||
{
|
||||
int msgSize = kcp.PeekSize();
|
||||
message = new ArraySegment<byte>();
|
||||
if (msgSize > 0)
|
||||
{
|
||||
// only allow receiving up to buffer sized messages.
|
||||
// otherwise we would get BlockCopy ArgumentException anyway.
|
||||
if (msgSize <= kcpMessageBuffer.Length)
|
||||
{
|
||||
// receive from kcp
|
||||
int received = kcp.Receive(kcpMessageBuffer, msgSize);
|
||||
if (received >= 0)
|
||||
{
|
||||
// extract header & content without header
|
||||
header = (KcpHeader)kcpMessageBuffer[0];
|
||||
message = new ArraySegment<byte>(kcpMessageBuffer, 1, msgSize - 1);
|
||||
lastReceiveTime = (uint)refTime.ElapsedMilliseconds;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// if receive failed, close everything
|
||||
Log.Warning($"Receive failed with error={received}. closing connection.");
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
// we don't allow sending messages > Max, so this must be an
|
||||
// attacker. let's disconnect to avoid allocation attacks etc.
|
||||
else
|
||||
{
|
||||
Log.Warning($"KCP: possible allocation attack for msgSize {msgSize} > buffer {kcpMessageBuffer.Length}. Disconnecting the connection.");
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
header = KcpHeader.Disconnect;
|
||||
return false;
|
||||
}
|
||||
|
||||
void TickIncoming_Connected(uint time)
|
||||
{
|
||||
// detect common events & ping
|
||||
HandleTimeout(time);
|
||||
HandleDeadLink();
|
||||
HandlePing(time);
|
||||
HandleChoked();
|
||||
|
||||
// any reliable kcp message received?
|
||||
if (ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message))
|
||||
{
|
||||
// message type FSM. no default so we never miss a case.
|
||||
switch (header)
|
||||
{
|
||||
case KcpHeader.Handshake:
|
||||
{
|
||||
// we were waiting for a handshake.
|
||||
// it proves that the other end speaks our protocol.
|
||||
Log.Info("KCP: received handshake");
|
||||
state = KcpState.Authenticated;
|
||||
OnAuthenticated?.Invoke();
|
||||
break;
|
||||
}
|
||||
case KcpHeader.Ping:
|
||||
{
|
||||
// ping keeps kcp from timing out. do nothing.
|
||||
break;
|
||||
}
|
||||
case KcpHeader.Data:
|
||||
case KcpHeader.Disconnect:
|
||||
{
|
||||
// everything else is not allowed during handshake!
|
||||
Log.Warning($"KCP: received invalid header {header} while Connected. Disconnecting the connection.");
|
||||
Disconnect();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void TickIncoming_Authenticated(uint time)
|
||||
{
|
||||
// detect common events & ping
|
||||
HandleTimeout(time);
|
||||
HandleDeadLink();
|
||||
HandlePing(time);
|
||||
HandleChoked();
|
||||
|
||||
// process all received messages
|
||||
//
|
||||
// Mirror scene changing requires transports to immediately stop
|
||||
// processing any more messages after a scene message was
|
||||
// received. and since we are in a while loop here, we need this
|
||||
// extra check.
|
||||
//
|
||||
// note while that this is mainly for Mirror, but might be
|
||||
// useful in other applications too.
|
||||
//
|
||||
// note that we check it BEFORE ever calling ReceiveNext. otherwise
|
||||
// we would silently eat the received message and never process it.
|
||||
while (!paused &&
|
||||
ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message))
|
||||
{
|
||||
// message type FSM. no default so we never miss a case.
|
||||
switch (header)
|
||||
{
|
||||
case KcpHeader.Handshake:
|
||||
{
|
||||
// should never receive another handshake after auth
|
||||
Log.Warning($"KCP: received invalid header {header} while Authenticated. Disconnecting the connection.");
|
||||
Disconnect();
|
||||
break;
|
||||
}
|
||||
case KcpHeader.Data:
|
||||
{
|
||||
// call OnData IF the message contained actual data
|
||||
if (message.Count > 0)
|
||||
{
|
||||
//Log.Warning($"Kcp recv msg: {BitConverter.ToString(message.Array, message.Offset, message.Count)}");
|
||||
OnData?.Invoke(message);
|
||||
}
|
||||
// empty data = attacker, or something went wrong
|
||||
else
|
||||
{
|
||||
Log.Warning("KCP: received empty Data message while Authenticated. Disconnecting the connection.");
|
||||
Disconnect();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case KcpHeader.Ping:
|
||||
{
|
||||
// ping keeps kcp from timing out. do nothing.
|
||||
break;
|
||||
}
|
||||
case KcpHeader.Disconnect:
|
||||
{
|
||||
// disconnect might happen
|
||||
Log.Info("KCP: received disconnect message");
|
||||
Disconnect();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void TickIncoming()
|
||||
{
|
||||
uint time = (uint)refTime.ElapsedMilliseconds;
|
||||
|
||||
try
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case KcpState.Connected:
|
||||
{
|
||||
TickIncoming_Connected(time);
|
||||
break;
|
||||
}
|
||||
case KcpState.Authenticated:
|
||||
{
|
||||
TickIncoming_Authenticated(time);
|
||||
break;
|
||||
}
|
||||
case KcpState.Disconnected:
|
||||
{
|
||||
// do nothing while disconnected
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SocketException exception)
|
||||
{
|
||||
// this is ok, the connection was closed
|
||||
Log.Info($"KCP Connection: Disconnecting because {exception}. This is fine.");
|
||||
Disconnect();
|
||||
}
|
||||
catch (ObjectDisposedException exception)
|
||||
{
|
||||
// fine, socket was closed
|
||||
Log.Info($"KCP Connection: Disconnecting because {exception}. This is fine.");
|
||||
Disconnect();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// unexpected
|
||||
Log.Error(ex.ToString());
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public void TickOutgoing()
|
||||
{
|
||||
uint time = (uint)refTime.ElapsedMilliseconds;
|
||||
|
||||
try
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case KcpState.Connected:
|
||||
case KcpState.Authenticated:
|
||||
{
|
||||
// update flushes out messages
|
||||
kcp.Update(time);
|
||||
break;
|
||||
}
|
||||
case KcpState.Disconnected:
|
||||
{
|
||||
// do nothing while disconnected
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SocketException exception)
|
||||
{
|
||||
// this is ok, the connection was closed
|
||||
Log.Info($"KCP Connection: Disconnecting because {exception}. This is fine.");
|
||||
Disconnect();
|
||||
}
|
||||
catch (ObjectDisposedException exception)
|
||||
{
|
||||
// fine, socket was closed
|
||||
Log.Info($"KCP Connection: Disconnecting because {exception}. This is fine.");
|
||||
Disconnect();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// unexpected
|
||||
Log.Error(ex.ToString());
|
||||
Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public void RawInput(byte[] buffer, int msgLength)
|
||||
{
|
||||
// parse channel
|
||||
if (msgLength > 0)
|
||||
{
|
||||
byte channel = buffer[0];
|
||||
switch (channel)
|
||||
{
|
||||
case (byte)KcpChannel.Reliable:
|
||||
{
|
||||
// input into kcp, but skip channel byte
|
||||
int input = kcp.Input(buffer, 1, msgLength - 1);
|
||||
if (input != 0)
|
||||
{
|
||||
Log.Warning($"Input failed with error={input} for buffer with length={msgLength - 1}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case (byte)KcpChannel.Unreliable:
|
||||
{
|
||||
// ideally we would queue all unreliable messages and
|
||||
// then process them in ReceiveNext() together with the
|
||||
// reliable messages, but:
|
||||
// -> queues/allocations/pools are slow and complex.
|
||||
// -> DOTSNET 10k is actually slower if we use pooled
|
||||
// unreliable messages for transform messages.
|
||||
//
|
||||
// DOTSNET 10k benchmark:
|
||||
// reliable-only: 170 FPS
|
||||
// unreliable queued: 130-150 FPS
|
||||
// unreliable direct: 183 FPS(!)
|
||||
//
|
||||
// DOTSNET 50k benchmark:
|
||||
// reliable-only: FAILS (queues keep growing)
|
||||
// unreliable direct: 18-22 FPS(!)
|
||||
//
|
||||
// -> all unreliable messages are DATA messages anyway.
|
||||
// -> let's skip the magic and call OnData directly if
|
||||
// the current state allows it.
|
||||
if (state == KcpState.Authenticated)
|
||||
{
|
||||
// only process messages while not paused for Mirror
|
||||
// scene switching etc.
|
||||
// -> if an unreliable message comes in while
|
||||
// paused, simply drop it. it's unreliable!
|
||||
if (!paused)
|
||||
{
|
||||
ArraySegment<byte> message = new ArraySegment<byte>(buffer, 1, msgLength - 1);
|
||||
OnData?.Invoke(message);
|
||||
}
|
||||
|
||||
// set last receive time to avoid timeout.
|
||||
// -> we do this in ANY case even if not enabled.
|
||||
// a message is a message.
|
||||
// -> we set last receive time for both reliable and
|
||||
// unreliable messages. both count.
|
||||
// otherwise a connection might time out even
|
||||
// though unreliable were received, but no
|
||||
// reliable was received.
|
||||
lastReceiveTime = (uint)refTime.ElapsedMilliseconds;
|
||||
}
|
||||
else
|
||||
{
|
||||
// should never
|
||||
Log.Warning($"KCP: received unreliable message in state {state}. Disconnecting the connection.");
|
||||
Disconnect();
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
// not a valid channel. random data or attacks.
|
||||
Log.Info($"Disconnecting connection because of invalid channel header: {channel}");
|
||||
Disconnect();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// raw send puts the data into the socket
|
||||
protected abstract void RawSend(byte[] data, int length);
|
||||
|
||||
// raw send called by kcp
|
||||
void RawSendReliable(byte[] data, int length)
|
||||
{
|
||||
// copy channel header, data into raw send buffer, then send
|
||||
rawSendBuffer[0] = (byte)KcpChannel.Reliable;
|
||||
Buffer.BlockCopy(data, 0, rawSendBuffer, 1, length);
|
||||
RawSend(rawSendBuffer, length + 1);
|
||||
}
|
||||
|
||||
void SendReliable(KcpHeader header, ArraySegment<byte> content)
|
||||
{
|
||||
// 1 byte header + content needs to fit into send buffer
|
||||
if (1 + content.Count <= kcpSendBuffer.Length) // TODO
|
||||
{
|
||||
// copy header, content (if any) into send buffer
|
||||
kcpSendBuffer[0] = (byte)header;
|
||||
if (content.Count > 0)
|
||||
Buffer.BlockCopy(content.Array, content.Offset, kcpSendBuffer, 1, content.Count);
|
||||
|
||||
// send to kcp for processing
|
||||
int sent = kcp.Send(kcpSendBuffer, 0, 1 + content.Count);
|
||||
if (sent < 0)
|
||||
{
|
||||
Log.Warning($"Send failed with error={sent} for content with length={content.Count}");
|
||||
}
|
||||
}
|
||||
// otherwise content is larger than MaxMessageSize. let user know!
|
||||
else Log.Error($"Failed to send reliable message of size {content.Count} because it's larger than ReliableMaxMessageSize={ReliableMaxMessageSize}");
|
||||
}
|
||||
|
||||
void SendUnreliable(ArraySegment<byte> message)
|
||||
{
|
||||
// message size needs to be <= unreliable max size
|
||||
if (message.Count <= UnreliableMaxMessageSize)
|
||||
{
|
||||
// copy channel header, data into raw send buffer, then send
|
||||
rawSendBuffer[0] = (byte)KcpChannel.Unreliable;
|
||||
Buffer.BlockCopy(message.Array, 0, rawSendBuffer, 1, message.Count);
|
||||
RawSend(rawSendBuffer, message.Count + 1);
|
||||
}
|
||||
// otherwise content is larger than MaxMessageSize. let user know!
|
||||
else Log.Error($"Failed to send unreliable message of size {message.Count} because it's larger than UnreliableMaxMessageSize={UnreliableMaxMessageSize}");
|
||||
}
|
||||
|
||||
// server & client need to send handshake at different times, so we need
|
||||
// to expose the function.
|
||||
// * client should send it immediately.
|
||||
// * server should send it as reply to client's handshake, not before
|
||||
// (server should not reply to random internet messages with handshake)
|
||||
// => handshake info needs to be delivered, so it goes over reliable.
|
||||
public void SendHandshake()
|
||||
{
|
||||
Log.Info("KcpConnection: sending Handshake to other end!");
|
||||
SendReliable(KcpHeader.Handshake, default);
|
||||
}
|
||||
|
||||
public void SendData(ArraySegment<byte> data, KcpChannel channel)
|
||||
{
|
||||
// sending empty segments is not allowed.
|
||||
// nobody should ever try to send empty data.
|
||||
// it means that something went wrong, e.g. in Mirror/DOTSNET.
|
||||
// let's make it obvious so it's easy to debug.
|
||||
if (data.Count == 0)
|
||||
{
|
||||
Log.Warning("KcpConnection: tried sending empty message. This should never happen. Disconnecting.");
|
||||
Disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
switch (channel)
|
||||
{
|
||||
case KcpChannel.Reliable:
|
||||
SendReliable(KcpHeader.Data, data);
|
||||
break;
|
||||
case KcpChannel.Unreliable:
|
||||
SendUnreliable(data);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// ping goes through kcp to keep it from timing out, so it goes over the
|
||||
// reliable channel.
|
||||
void SendPing() => SendReliable(KcpHeader.Ping, default);
|
||||
|
||||
// disconnect info needs to be delivered, so it goes over reliable
|
||||
void SendDisconnect() => SendReliable(KcpHeader.Disconnect, default);
|
||||
|
||||
protected virtual void Dispose() {}
|
||||
|
||||
// disconnect this connection
|
||||
public void Disconnect()
|
||||
{
|
||||
// only if not disconnected yet
|
||||
if (state == KcpState.Disconnected)
|
||||
return;
|
||||
|
||||
// send a disconnect message
|
||||
if (socket.Connected)
|
||||
{
|
||||
try
|
||||
{
|
||||
SendDisconnect();
|
||||
kcp.Flush();
|
||||
}
|
||||
catch (SocketException)
|
||||
{
|
||||
// this is ok, the connection was already closed
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// this is normal when we stop the server
|
||||
// the socket is stopped so we can't send anything anymore
|
||||
// to the clients
|
||||
|
||||
// the clients will eventually timeout and realize they
|
||||
// were disconnected
|
||||
}
|
||||
}
|
||||
|
||||
// set as Disconnected, call event
|
||||
Log.Info("KCP Connection: Disconnected.");
|
||||
state = KcpState.Disconnected;
|
||||
OnDisconnected?.Invoke();
|
||||
}
|
||||
|
||||
// get remote endpoint
|
||||
public EndPoint GetRemoteEndPoint() => remoteEndpoint;
|
||||
|
||||
// pause/unpause to safely support mirror scene handling and to
|
||||
// immediately pause the receive while loop if needed.
|
||||
public void Pause() => paused = true;
|
||||
public void Unpause()
|
||||
{
|
||||
// unpause
|
||||
paused = false;
|
||||
|
||||
// reset the timeout.
|
||||
// we have likely been paused for > timeout seconds, but that
|
||||
// doesn't mean we should disconnect. for example, Mirror pauses
|
||||
// kcp during scene changes which could easily take > 10s timeout:
|
||||
// see also: https://github.com/vis2k/kcp2k/issues/8
|
||||
// => Unpause completely resets the timeout instead of restoring the
|
||||
// time difference when we started pausing. it's more simple and
|
||||
// it's a good idea to start counting from 0 after we unpaused!
|
||||
lastReceiveTime = (uint)refTime.ElapsedMilliseconds;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
namespace kcp2k
|
||||
{
|
||||
// header for messages processed by kcp.
|
||||
// this is NOT for the raw receive messages(!) because handshake/disconnect
|
||||
// need to be sent reliably. it's not enough to have those in rawreceive
|
||||
// because those messages might get lost without being resent!
|
||||
public enum KcpHeader : byte
|
||||
{
|
||||
// don't react on 0x00. might help to filter out random noise.
|
||||
Handshake = 0x01,
|
||||
// ping goes over reliable & KcpHeader for now. could go over reliable
|
||||
// too. there is no real difference except that this is easier because
|
||||
// we already have a KcpHeader for reliable messages.
|
||||
// ping is only used to keep it alive, so latency doesn't matter.
|
||||
Ping = 0x02,
|
||||
Data = 0x03,
|
||||
Disconnect = 0x04
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,297 @@
|
|||
// kcp server logic abstracted into a class.
|
||||
// for use in Mirror, DOTSNET, testing, etc.
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public class KcpServer
|
||||
{
|
||||
// events
|
||||
public Action<int> OnConnected;
|
||||
public Action<int, ArraySegment<byte>> OnData;
|
||||
public Action<int> OnDisconnected;
|
||||
|
||||
// configuration
|
||||
// NoDelay is recommended to reduce latency. This also scales better
|
||||
// without buffers getting full.
|
||||
public bool NoDelay;
|
||||
// KCP internal update interval. 100ms is KCP default, but a lower
|
||||
// interval is recommended to minimize latency and to scale to more
|
||||
// networked entities.
|
||||
public uint Interval;
|
||||
// KCP fastresend parameter. Faster resend for the cost of higher
|
||||
// bandwidth.
|
||||
public int FastResend;
|
||||
// KCP 'NoCongestionWindow' is false by default. here we negate it for
|
||||
// ease of use. This can be disabled for high scale games if connections
|
||||
// choke regularly.
|
||||
public bool CongestionWindow;
|
||||
// KCP window size can be modified to support higher loads.
|
||||
// for example, Mirror Benchmark requires:
|
||||
// 128, 128 for 4k monsters
|
||||
// 512, 512 for 10k monsters
|
||||
// 8192, 8192 for 20k monsters
|
||||
public uint SendWindowSize;
|
||||
public uint ReceiveWindowSize;
|
||||
|
||||
// state
|
||||
Socket socket;
|
||||
#if UNITY_SWITCH
|
||||
// switch does not support ipv6
|
||||
EndPoint newClientEP = new IPEndPoint(IPAddress.Any, 0);
|
||||
#else
|
||||
EndPoint newClientEP = new IPEndPoint(IPAddress.IPv6Any, 0);
|
||||
#endif
|
||||
// IMPORTANT: raw receive buffer always needs to be of 'MTU' size, even
|
||||
// if MaxMessageSize is larger. kcp always sends in MTU
|
||||
// segments and having a buffer smaller than MTU would
|
||||
// silently drop excess data.
|
||||
// => we need the mtu to fit channel + message!
|
||||
readonly byte[] rawReceiveBuffer = new byte[Kcp.MTU_DEF];
|
||||
|
||||
// connections <connectionId, connection> where connectionId is EndPoint.GetHashCode
|
||||
public Dictionary<int, KcpServerConnection> connections = new Dictionary<int, KcpServerConnection>();
|
||||
|
||||
public KcpServer(Action<int> OnConnected,
|
||||
Action<int, ArraySegment<byte>> OnData,
|
||||
Action<int> OnDisconnected,
|
||||
bool NoDelay,
|
||||
uint Interval,
|
||||
int FastResend = 0,
|
||||
bool CongestionWindow = true,
|
||||
uint SendWindowSize = Kcp.WND_SND,
|
||||
uint ReceiveWindowSize = Kcp.WND_RCV)
|
||||
{
|
||||
this.OnConnected = OnConnected;
|
||||
this.OnData = OnData;
|
||||
this.OnDisconnected = OnDisconnected;
|
||||
this.NoDelay = NoDelay;
|
||||
this.Interval = Interval;
|
||||
this.FastResend = FastResend;
|
||||
this.CongestionWindow = CongestionWindow;
|
||||
this.SendWindowSize = SendWindowSize;
|
||||
this.ReceiveWindowSize = ReceiveWindowSize;
|
||||
}
|
||||
|
||||
public bool IsActive() => socket != null;
|
||||
|
||||
public void Start(ushort port)
|
||||
{
|
||||
// only start once
|
||||
if (socket != null)
|
||||
{
|
||||
Log.Warning("KCP: server already started!");
|
||||
}
|
||||
|
||||
// listen
|
||||
#if UNITY_SWITCH
|
||||
// Switch does not support ipv6
|
||||
socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
|
||||
socket.Bind(new IPEndPoint(IPAddress.Any, port));
|
||||
#else
|
||||
socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp);
|
||||
socket.DualMode = true;
|
||||
socket.Bind(new IPEndPoint(IPAddress.IPv6Any, port));
|
||||
#endif
|
||||
}
|
||||
|
||||
public void Send(int connectionId, ArraySegment<byte> segment, KcpChannel channel)
|
||||
{
|
||||
if (connections.TryGetValue(connectionId, out KcpServerConnection connection))
|
||||
{
|
||||
connection.SendData(segment, channel);
|
||||
}
|
||||
}
|
||||
|
||||
public void Disconnect(int connectionId)
|
||||
{
|
||||
if (connections.TryGetValue(connectionId, out KcpServerConnection connection))
|
||||
{
|
||||
connection.Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public string GetClientAddress(int connectionId)
|
||||
{
|
||||
if (connections.TryGetValue(connectionId, out KcpServerConnection connection))
|
||||
{
|
||||
return (connection.GetRemoteEndPoint() as IPEndPoint).Address.ToString();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
// process incoming messages. should be called before updating the world.
|
||||
HashSet<int> connectionsToRemove = new HashSet<int>();
|
||||
public void TickIncoming()
|
||||
{
|
||||
while (socket != null && socket.Poll(0, SelectMode.SelectRead))
|
||||
{
|
||||
try
|
||||
{
|
||||
int msgLength = socket.ReceiveFrom(rawReceiveBuffer, 0, rawReceiveBuffer.Length, SocketFlags.None, ref newClientEP);
|
||||
//Log.Info($"KCP: server raw recv {msgLength} bytes = {BitConverter.ToString(buffer, 0, msgLength)}");
|
||||
|
||||
// calculate connectionId from endpoint
|
||||
int connectionId = newClientEP.GetHashCode();
|
||||
|
||||
// IMPORTANT: detect if buffer was too small for the received
|
||||
// msgLength. otherwise the excess data would be
|
||||
// silently lost.
|
||||
// (see ReceiveFrom documentation)
|
||||
if (msgLength <= rawReceiveBuffer.Length)
|
||||
{
|
||||
// is this a new connection?
|
||||
if (!connections.TryGetValue(connectionId, out KcpServerConnection connection))
|
||||
{
|
||||
// create a new KcpConnection
|
||||
connection = new KcpServerConnection(socket, newClientEP, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize);
|
||||
|
||||
// DO NOT add to connections yet. only if the first message
|
||||
// is actually the kcp handshake. otherwise it's either:
|
||||
// * random data from the internet
|
||||
// * or from a client connection that we just disconnected
|
||||
// but that hasn't realized it yet, still sending data
|
||||
// from last session that we should absolutely ignore.
|
||||
//
|
||||
//
|
||||
// TODO this allocates a new KcpConnection for each new
|
||||
// internet connection. not ideal, but C# UDP Receive
|
||||
// already allocated anyway.
|
||||
//
|
||||
// expecting a MAGIC byte[] would work, but sending the raw
|
||||
// UDP message without kcp's reliability will have low
|
||||
// probability of being received.
|
||||
//
|
||||
// for now, this is fine.
|
||||
|
||||
// setup authenticated event that also adds to connections
|
||||
connection.OnAuthenticated = () =>
|
||||
{
|
||||
// only send handshake to client AFTER we received his
|
||||
// handshake in OnAuthenticated.
|
||||
// we don't want to reply to random internet messages
|
||||
// with handshakes each time.
|
||||
connection.SendHandshake();
|
||||
|
||||
// add to connections dict after being authenticated.
|
||||
connections.Add(connectionId, connection);
|
||||
Log.Info($"KCP: server added connection({connectionId}): {newClientEP}");
|
||||
|
||||
// setup Data + Disconnected events only AFTER the
|
||||
// handshake. we don't want to fire OnServerDisconnected
|
||||
// every time we receive invalid random data from the
|
||||
// internet.
|
||||
|
||||
// setup data event
|
||||
connection.OnData = (message) =>
|
||||
{
|
||||
// call mirror event
|
||||
//Log.Info($"KCP: OnServerDataReceived({connectionId}, {BitConverter.ToString(message.Array, message.Offset, message.Count)})");
|
||||
OnData.Invoke(connectionId, message);
|
||||
};
|
||||
|
||||
// setup disconnected event
|
||||
connection.OnDisconnected = () =>
|
||||
{
|
||||
// flag for removal
|
||||
// (can't remove directly because connection is updated
|
||||
// and event is called while iterating all connections)
|
||||
connectionsToRemove.Add(connectionId);
|
||||
|
||||
// call mirror event
|
||||
Log.Info($"KCP: OnServerDisconnected({connectionId})");
|
||||
OnDisconnected.Invoke(connectionId);
|
||||
};
|
||||
|
||||
// finally, call mirror OnConnected event
|
||||
Log.Info($"KCP: OnServerConnected({connectionId})");
|
||||
OnConnected.Invoke(connectionId);
|
||||
};
|
||||
|
||||
// now input the message & process received ones
|
||||
// connected event was set up.
|
||||
// tick will process the first message and adds the
|
||||
// connection if it was the handshake.
|
||||
connection.RawInput(rawReceiveBuffer, msgLength);
|
||||
connection.TickIncoming();
|
||||
|
||||
// again, do not add to connections.
|
||||
// if the first message wasn't the kcp handshake then
|
||||
// connection will simply be garbage collected.
|
||||
}
|
||||
// existing connection: simply input the message into kcp
|
||||
else
|
||||
{
|
||||
connection.RawInput(rawReceiveBuffer, msgLength);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.Error($"KCP Server: message of size {msgLength} does not fit into buffer of size {rawReceiveBuffer.Length}. The excess was silently dropped. Disconnecting connectionId={connectionId}.");
|
||||
Disconnect(connectionId);
|
||||
}
|
||||
}
|
||||
// this is fine, the socket might have been closed in the other end
|
||||
catch (SocketException) {}
|
||||
}
|
||||
|
||||
// process inputs for all server connections
|
||||
// (even if we didn't receive anything. need to tick ping etc.)
|
||||
foreach (KcpServerConnection connection in connections.Values)
|
||||
{
|
||||
connection.TickIncoming();
|
||||
}
|
||||
|
||||
// remove disconnected connections
|
||||
// (can't do it in connection.OnDisconnected because Tick is called
|
||||
// while iterating connections)
|
||||
foreach (int connectionId in connectionsToRemove)
|
||||
{
|
||||
connections.Remove(connectionId);
|
||||
}
|
||||
connectionsToRemove.Clear();
|
||||
}
|
||||
|
||||
// process outgoing messages. should be called after updating the world.
|
||||
public void TickOutgoing()
|
||||
{
|
||||
// flush all server connections
|
||||
foreach (KcpServerConnection connection in connections.Values)
|
||||
{
|
||||
connection.TickOutgoing();
|
||||
}
|
||||
}
|
||||
|
||||
// process incoming and outgoing for convenience.
|
||||
// => ideally call ProcessIncoming() before updating the world and
|
||||
// ProcessOutgoing() after updating the world for minimum latency
|
||||
public void Tick()
|
||||
{
|
||||
TickIncoming();
|
||||
TickOutgoing();
|
||||
}
|
||||
|
||||
public void Stop()
|
||||
{
|
||||
socket?.Close();
|
||||
socket = null;
|
||||
}
|
||||
|
||||
// pause/unpause to safely support mirror scene handling and to
|
||||
// immediately pause the receive while loop if needed.
|
||||
public void Pause()
|
||||
{
|
||||
foreach (KcpServerConnection connection in connections.Values)
|
||||
connection.Pause();
|
||||
}
|
||||
|
||||
public void Unpause()
|
||||
{
|
||||
foreach (KcpServerConnection connection in connections.Values)
|
||||
connection.Unpause();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public class KcpServerConnection : KcpConnection
|
||||
{
|
||||
public KcpServerConnection(Socket socket, EndPoint remoteEndpoint, bool noDelay, uint interval = Kcp.INTERVAL, int fastResend = 0, bool congestionWindow = true, uint sendWindowSize = Kcp.WND_SND, uint receiveWindowSize = Kcp.WND_RCV)
|
||||
{
|
||||
this.socket = socket;
|
||||
this.remoteEndpoint = remoteEndpoint;
|
||||
SetupKcp(noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
|
||||
}
|
||||
|
||||
protected override void RawSend(byte[] data, int length)
|
||||
{
|
||||
socket.SendTo(data, 0, length, SocketFlags.None, remoteEndpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
// A simple logger class that uses Console.WriteLine by default.
|
||||
// Can also do Logger.LogMethod = Debug.Log for Unity etc.
|
||||
// (this way we don't have to depend on UnityEngine)
|
||||
using System;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public static class Log
|
||||
{
|
||||
public static Action<string> Info = Console.WriteLine;
|
||||
public static Action<string> Warning = Console.WriteLine;
|
||||
public static Action<string> Error = Console.Error.WriteLine;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("kcp2k.Tests")]
|
||||
1032
ServerProject-DONT-IMPORT-INTO-UNITY/MultiCompiled/KCP/kcp/Kcp.cs
Normal file
1032
ServerProject-DONT-IMPORT-INTO-UNITY/MultiCompiled/KCP/kcp/Kcp.cs
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -0,0 +1,81 @@
|
|||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
// KCP Segment Definition
|
||||
internal class Segment
|
||||
{
|
||||
internal uint conv; // conversation
|
||||
internal uint cmd; // command, e.g. Kcp.CMD_ACK etc.
|
||||
internal uint frg; // fragment
|
||||
internal uint wnd; // window size that the receive can currently receive
|
||||
internal uint ts; // timestamp
|
||||
internal uint sn; // serial number
|
||||
internal uint una;
|
||||
internal uint resendts; // resend timestamp
|
||||
internal int rto;
|
||||
internal uint fastack;
|
||||
internal uint xmit;
|
||||
// we need a auto scaling byte[] with a WriteBytes function.
|
||||
// MemoryStream does that perfectly, no need to reinvent the wheel.
|
||||
// note: no need to pool it, because Segment is already pooled.
|
||||
internal MemoryStream data = new MemoryStream();
|
||||
|
||||
// pool ////////////////////////////////////////////////////////////////
|
||||
internal static readonly Stack<Segment> Pool = new Stack<Segment>(32);
|
||||
|
||||
public static Segment Take()
|
||||
{
|
||||
if (Pool.Count > 0)
|
||||
{
|
||||
Segment seg = Pool.Pop();
|
||||
return seg;
|
||||
}
|
||||
return new Segment();
|
||||
}
|
||||
|
||||
public static void Return(Segment seg)
|
||||
{
|
||||
seg.Reset();
|
||||
Pool.Push(seg);
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// ikcp_encode_seg
|
||||
// encode a segment into buffer
|
||||
internal int Encode(byte[] ptr, int offset)
|
||||
{
|
||||
int offset_ = offset;
|
||||
offset += Utils.Encode32U(ptr, offset, conv);
|
||||
offset += Utils.Encode8u(ptr, offset, (byte)cmd);
|
||||
offset += Utils.Encode8u(ptr, offset, (byte)frg);
|
||||
offset += Utils.Encode16U(ptr, offset, (ushort)wnd);
|
||||
offset += Utils.Encode32U(ptr, offset, ts);
|
||||
offset += Utils.Encode32U(ptr, offset, sn);
|
||||
offset += Utils.Encode32U(ptr, offset, una);
|
||||
offset += Utils.Encode32U(ptr, offset, (uint)data.Position);
|
||||
|
||||
return offset - offset_;
|
||||
}
|
||||
|
||||
// reset to return a fresh segment to the pool
|
||||
internal void Reset()
|
||||
{
|
||||
conv = 0;
|
||||
cmd = 0;
|
||||
frg = 0;
|
||||
wnd = 0;
|
||||
ts = 0;
|
||||
sn = 0;
|
||||
una = 0;
|
||||
rto = 0;
|
||||
xmit = 0;
|
||||
resendts = 0;
|
||||
fastack = 0;
|
||||
|
||||
// keep buffer for next pool usage, but reset length (= bytes written)
|
||||
data.SetLength(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
using System.Runtime.CompilerServices;
|
||||
|
||||
namespace kcp2k
|
||||
{
|
||||
public static partial class Utils
|
||||
{
|
||||
// Clamp so we don't have to depend on UnityEngine
|
||||
public static int Clamp(int value, int min, int max)
|
||||
{
|
||||
if (value < min) return min;
|
||||
if (value > max) return max;
|
||||
return value;
|
||||
}
|
||||
|
||||
// encode 8 bits unsigned int
|
||||
public static int Encode8u(byte[] p, int offset, byte c)
|
||||
{
|
||||
p[0 + offset] = c;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// decode 8 bits unsigned int
|
||||
public static int Decode8u(byte[] p, int offset, ref byte c)
|
||||
{
|
||||
c = p[0 + offset];
|
||||
return 1;
|
||||
}
|
||||
|
||||
// encode 16 bits unsigned int (lsb)
|
||||
public static int Encode16U(byte[] p, int offset, ushort w)
|
||||
{
|
||||
p[0 + offset] = (byte)(w >> 0);
|
||||
p[1 + offset] = (byte)(w >> 8);
|
||||
return 2;
|
||||
}
|
||||
|
||||
// decode 16 bits unsigned int (lsb)
|
||||
public static int Decode16U(byte[] p, int offset, ref ushort c)
|
||||
{
|
||||
ushort result = 0;
|
||||
result |= p[0 + offset];
|
||||
result |= (ushort)(p[1 + offset] << 8);
|
||||
c = result;
|
||||
return 2;
|
||||
}
|
||||
|
||||
// encode 32 bits unsigned int (lsb)
|
||||
public static int Encode32U(byte[] p, int offset, uint l)
|
||||
{
|
||||
p[0 + offset] = (byte)(l >> 0);
|
||||
p[1 + offset] = (byte)(l >> 8);
|
||||
p[2 + offset] = (byte)(l >> 16);
|
||||
p[3 + offset] = (byte)(l >> 24);
|
||||
return 4;
|
||||
}
|
||||
|
||||
// decode 32 bits unsigned int (lsb)
|
||||
public static int Decode32U(byte[] p, int offset, ref uint c)
|
||||
{
|
||||
uint result = 0;
|
||||
result |= p[0 + offset];
|
||||
result |= (uint)(p[1 + offset] << 8);
|
||||
result |= (uint)(p[2 + offset] << 16);
|
||||
result |= (uint)(p[3 + offset] << 24);
|
||||
c = result;
|
||||
return 4;
|
||||
}
|
||||
|
||||
// timediff was a macro in original Kcp. let's inline it if possible.
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int TimeDiff(uint later, uint earlier)
|
||||
{
|
||||
return (int)(later - earlier);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue