MVC核心,Web套接字和线程
我正在研究一个解决方案,该解决方案使用Web套接字协议在服务器(MVC Core Web应用程序)发生某些事件时通知客户端(Web浏览器)。 我使用Microsoft.AspNetCore.WebSockets nuget。
这是我的客户端代码:
$(function () {
var socket = new WebSocket("ws://localhost:61019/data/openSocket");
socket.onopen = function () {
$(".socket-status").css("color", "green");
}
socket.onmessage = function (message) {
$("body").append(document.createTextNode(message.data));
}
socket.onclose = function () {
$(".socket-status").css("color", "red");
}
});
加载此视图时,套接字请求会立即发送到MVC Core应用程序。 这是控制器操作:
[Route("data")]
public class DataController : Controller
{
[Route("openSocket")]
[HttpGet]
public ActionResult OpenSocket()
{
if (HttpContext.WebSockets.IsWebSocketRequest)
{
WebSocket socket = HttpContext.WebSockets.AcceptWebSocketAsync().Result;
if (socket != null && socket.State == WebSocketState.Open)
{
while (!HttpContext.RequestAborted.IsCancellationRequested)
{
var response = string.Format("Hello! Time {0}", System.DateTime.Now.ToString());
var bytes = System.Text.Encoding.UTF8.GetBytes(response);
Task.Run(() => socket.SendAsync(new System.ArraySegment<byte>(bytes),
WebSocketMessageType.Text, true, CancellationToken.None));
Thread.Sleep(3000);
}
}
}
return new StatusCodeResult(101);
}
}
这段代码工作得很好。 这里的WebSocket专门用于发送,并没有收到任何东西。 然而,问题是while循环一直持有DataController线程,直到检测到取消请求。
这里的Web套接字绑定到HttpContext对象。 一旦Web请求的HttpContext被销毁,套接字连接立即关闭。
问题1:有什么方法可以将套接字保存在控制器线程之外? 我尝试将它放入一个单身,它位于主应用程序线程上运行的MVC Core Startup类中。 有什么办法保持套接字打开或者在主应用程序线程中重新建立连接,而不是用一个while循环持续保持控制器线程? 即使认为可以阻塞控制器线程以使套接字连接保持打开状态,我也不会想到在OpenSocket的while循环中放置任何好的代码。 你认为在控制器中有一个手动重置事件,并等待它在OpenSocket动作中的while循环内部设置?
问题2:如果无法在MVC中分离HttpContext和WebSocket对象,可以使用其他替代技术或开发模式来实现套接字连接重用? 如果有人认为SignalR或类似的库有一些代码允许独立于HttpContext的套接字,请分享一些示例代码。 如果有人认为在这种特殊情况下MVC有更好的选择,请提供一个例子,如果MVC没有处理独立套接字通信的能力,我不介意转换到纯ASP.NET或Web API。
问题3:需要保持套接字连接处于活动状态或能够重新连接,直到用户明确的超时或取消请求。 这个想法是一些独立的事件发生在服务器上,触发已建立的套接字发送数据。 如果您认为网络套接字以外的其他技术对于此场景(如HTML / 2或流媒体)更有用,请您介绍一下您将使用的模式和框架?
PS可能的解决方案是每秒发送一次AJAX请求,询问服务器上是否有新的数据。 这是最后的手段。
经过漫长的研究,我最终选择了定制的中间件解决方案。 这是我的中间件类:
public class SocketMiddleware
{
private static ConcurrentDictionary<string, SocketMiddleware> _activeConnections = new ConcurrentDictionary<string, SocketMiddleware>();
private string _packet;
private ManualResetEvent _send = new ManualResetEvent(false);
private ManualResetEvent _exit = new ManualResetEvent(false);
private readonly RequestDelegate _next;
public SocketMiddleware(RequestDelegate next)
{
_next = next;
}
public void Send(string data)
{
_packet = data;
_send.Set();
}
public async Task Invoke(HttpContext context)
{
if (context.WebSockets.IsWebSocketRequest)
{
string connectionName = context.Request.Query["connectionName"]);
if (!_activeConnections.Any(ac => ac.Key == connectionName))
{
WebSocket socket = await context.WebSockets.AcceptWebSocketAsync();
if (socket == null || socket.State != WebSocketState.Open)
{
await _next.Invoke(context);
return;
}
Thread sender = new Thread(() => StartSending(socket));
sender.Start();
if (!_activeConnections.TryAdd(connectionName, this))
{
_exit.Set();
await _next.Invoke(context);
return;
}
while (true)
{
WebSocketReceiveResult result = socket.ReceiveAsync(new ArraySegment<byte>(new byte[1]), CancellationToken.None).Result;
if (result.CloseStatus.HasValue)
{
_exit.Set();
break;
}
}
SocketHandler dummy;
_activeConnections.TryRemove(key, out dummy);
}
}
await _next.Invoke(context);
string data = context.Items["Data"] as string;
if (!string.IsNullOrEmpty(data))
{
string name = context.Items["ConnectionName"] as string;
SocketMiddleware connection = _activeConnections.Where(ac => ac.Key == name)?.Single().Value;
if (connection != null)
{
connection.Send(data);
}
}
}
private void StartSending(WebSocket socket)
{
WaitHandle[] events = new WaitHandle[] { _send, _exit };
while (true)
{
if (WaitHandle.WaitAny(events) == 1)
{
break;
}
if (!string.IsNullOrEmpty(_packet))
{
SendPacket(socket, _packet);
}
_send.Reset();
}
}
private void SendPacket(WebSocket socket, string packet)
{
byte[] buffer = Encoding.UTF8.GetBytes(packet);
ArraySegment<byte> segment = new ArraySegment<byte>(buffer);
Task.Run(() => socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None));
}
}
这个中间件将在每个请求上运行。 当调用Invoke时,它会检查它是否是Web套接字请求。 如果是,中间件会检查这种连接是否已经打开,如果不是,则握手被接受,中间件将其添加到连接字典中。 字典必须是静态的,以便在应用程序生命周期中仅创建一次。
现在,如果我们在这里停止并向上移动管道,HttpContext最终将被销毁,并且由于套接字没有正确封装,它也将被关闭。 所以我们必须保持中间件线程的运行。 这是通过询问socket来接收一些数据完成的。
你可能会问,为什么我们需要收到任何东西,如果要求只是发送? 答案是它是可靠地检测客户端断开连接的唯一方法。 HttpContext.RequestAborted.IsCancellationRequested只有在while循环中不断发送时才有效。 如果您需要等待WaitHandle上的某个服务器事件,则取消标志永远不会为true。 我试图等待HttpContext.RequestAborted.WaitHandle作为我的退出事件,但它从未设置。 所以我们要求socket接收一些东西,如果某些东西将CloseStatus.HasValue设置为true,我们知道客户端已断开连接。 如果我们收到其他内容(客户端代码不安全),我们将忽略它并重新开始接收。
发送是在一个单独的线程中完成的。 原因是一样的,如果我们在主要的中间件线程上等待,就不可能检测到断开连接。 为了通知发送者线程客户端断开连接,我们使用_exit同步变量。 请记住,由于SocketMiddleware实例保存在一个静态容器中,因此可以让私人成员在这里。
现在,我们如何实际发送这些设置? 假设服务器上发生事件,并且有些数据可用。 为了简单起见,我们假设这个数据在正常http请求内到达某些控制器动作。 SocketMiddleware将针对每个请求运行,但由于它不是Web套接字请求,因此_next.Invoke(context)被调用,并且请求到达控制器动作,其形式可能如下所示:
[Route("ProvideData")]
[HttpGet]
public ActionResult ProvideData(string data, string connectionName)
{
if (!string.IsNullOrEmpty(data) && !string.IsNullOrEmpty(connectionName))
{
HttpContext.Items.Add("ConnectionName", connectionName);
HttpContext.Items.Add("Data", data);
}
return Ok();
}
控制器填充用于在组件之间共享数据的Items集合。 然后,管道再次返回到SocketMiddleware,在那里我们检查context.Items中是否有任何有趣的内容。 如果存在,我们从字典中选择相应的连接,并调用其设置数据字符串并设置_send事件的Send()方法,并允许在发送者线程内单次运行while循环。
瞧,我们有一个在服务器端发送事件的套接字连接。 这个例子很原始,只是为了说明这个概念。 当然,要使用这个中间件,在添加MVC之前,需要在Startup类中添加以下几行:
app.UseWebSockets();
app.UseMiddleware<SocketMiddleware>();
代码非常奇怪,希望当SignalNet for dotnetcore终于出来时,我们可以写出更好的东西。 希望这个例子对某人有用。 欢迎提出意见和建议。
链接地址: http://www.djcxy.com/p/44723.html