Cowboy.WebSockets 是一个托管在 GitHub 上的基于 .NET/C# 实现的开源 WebSocket 网络库,其完整的实现了 RFC 6455 (The WebSocket Protocol) 协议标准,并部分实现了 RFC 7692 (Compression Extensions for WebSocket) 协议标准。
WebSocket 可理解为建立在 TCP 连接通道上的更进一步的握手,并确定了消息封装格式。
通过定义控制帧 (Control Frame) 和数据帧 (Data Frame) 来控制通道内的通信和数据传输,下图用使用 ABNF 格式描述了帧头部的格式。
Cowboy.WebSockets 中对于 WebSocket 的 Client/Server 分别做了实现,分别对应代码中的:
Cowboy.WebSockets 的内部实现是基于 Cowboy.Sockets 中的 TAP 模式的 AsyncTcpSocketServer 和 AsyncTcpSocketClient 。关于 Cowboy.Sockets 可以参考文章《C#高性能TCP服务的多种实现方式》。
可通过 NuGet 查找 Cowboy 来获取 nuget 包。
WebSocket 服务端应用实现 AsyncWebSocketServerModule 抽象类,其中 ModulePath 对应着 "ws://host:port/path" 中的 path 部分。可以实现多个 Module,将多个 Module 注入到 AsyncWebSocketServerModuleCatalog 中,或者采用反射机制等自动发现 Module。
public class TestWebSocketModule : AsyncWebSocketServerModule { public TestWebSocketModule() : ) { } Task OnSessionStarted(AsyncWebSocketSession session) { Console.WriteLine(, session.RemoteEndPoint)); await Task.CompletedTask; } Task OnSessionTextReceived(AsyncWebSocketSession session, string text) { Console.Write(, session.RemoteEndPoint)); Console.WriteLine(, text)); await session.SendTextAsync(text); } Task OnSessionBinaryReceived(AsyncWebSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(, session.RemoteEndPoint)); Console.WriteLine(, text)); await session.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); } Task OnSessionClosed(AsyncWebSocketSession session) { Console.WriteLine(, session.RemoteEndPoint)); await Task.CompletedTask; } }
实例化 AsyncWebSocketServer,并将 AsyncWebSocketServerModuleCatalog 实例注入,即可启动 WebSocket 的服务端监听。
class Program { static AsyncWebSocketServer _server; static void Main(string[] args) { NLogLogger.Use(); try { var catalog = new AsyncWebSocketServerModuleCatalog(); catalog.RegisterModule(new TestWebSocketModule()); var config = new AsyncWebSocketServerConfiguration(); //config.SslEnabled = true; //config.SslServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx", "Cowboy"); //config.SslPolicyErrorsBypassed = true; _server = new AsyncWebSocketServer(22222, catalog, config); _server.Listen(); Console.WriteLine(, _server.ListenedEndPoint); Console.WriteLine(); while (true) { try { string text = Console.ReadLine(); ) break; Task.Run(async () => { //await _server.BroadcastText(text); _server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine(, _server.ListenedEndPoint, text); }); } catch (Exception ex) { Console.WriteLine(ex.Message); } } _server.Shutdown(); Console.WriteLine(, _server.ListenedEndPoint); } catch (Exception ex) { Logger.Get<Program>().Error(ex.Message, ex); } Console.ReadKey(); } }
WebSocket 客户端应用客户端侧在实例化 AsyncWebSocketClient 时有两种方式:
public interface IAsyncWebSocketClientMessageDispatcher { Task OnServerConnected(AsyncWebSocketClient client); Task OnServerTextReceived(AsyncWebSocketClient client, string text); Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerDisconnected(AsyncWebSocketClient client); Task OnServerFragmentationStreamOpened(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamContinued(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamClosed(AsyncWebSocketClient client, byte[] data, int offset, int count); }
下面的 DEMO 采用了方式二。