电子商务网站开发的视频,网站网页不对称,六年级做的网站的软件,网站开发公司人员配备第 11 章 开发实时应用和服务在本章#xff0c;我们将讨论“实时”的准确含义#xff0c;以及在大部分消费者看来应该属于这一范畴的应用类型接着#xff0c;我们将探讨 WebSocket#xff0c;并分析为什么传统的 WebSocket 与云环境完全不相适应#xff0c;最后我们将构建… 第 11 章 开发实时应用和服务在本章我们将讨论“实时”的准确含义以及在大部分消费者看来应该属于这一范畴的应用类型接着我们将探讨 WebSocket并分析为什么传统的 WebSocket 与云环境完全不相适应最后我们将构建一个实时应用的示例用于展示向一个事件溯源系统添加实时消息的强大功能实时应用的定义我认为实时系统的定义可以稍微宽泛一点只要是事件的接收与处理过程之间只有少许延迟或者完全没有延迟都可以认为是实时系统下面是真正的实时系统中区分出非实时系统的几个特点应用收集输入数据后在生成输出前有明显的等待应用只按照固定间隔或者基于某种按计划或随机触发的外部信号生成输出实时系统有一个真正常见的迹象和特征即当相关方关注的事件发生时它们会收到推送通知而不是由相关方以挂起等待或者间隔查询的方式来检查新状态云环境中的 WebSocketWebSocket 协议WebSocket 协议始于 2008 年它定义了浏览器和服务器之间建立持久的双向 Socket 连接的标准这让服务器向运行于浏览器中的 Web 应用发送数据称为可能期间不需要由 Web 应用执行“轮询”在底层实现中浏览器向服务器请求连接进行升级握手完成后浏览器和服务器将切换为单独的二进制 TCP 连接以实现双向通信部署模式假如所有服务器都运行在亚马逊云的弹性计算服务环境中当虚拟机被托管在云基础设施中时它们就可能随时被搬移、销毁并重建这原本是一件好事旨在让应用近乎不受限制地伸缩不过这也意味着这种“实时” WebSocket 连接可能被切断或者严重延迟并在不知不觉中失去响应此处的解决方案通常是将对 WebSocket 的使用独立出去--把管理 WebSocket 连接和数据传输工作转移到应用的代码之外的位置简单地说相比于在自己的应用中管理 WebSocket我们应该选用一种基于云的消息服务让更专业的人来完成这项工作使用云消息服务我们的应用需要拥有实时通信的能力我们希望微服务能够向客户端推送数据但客户端无法建立到微服务的持续 TCP 连接我们还希望能够使用相同类似的消息机制向后端服务发送消息为让微服务遵循云原生特性、保留可伸缩的能力并在云环境中自由地搬移我们需要挑选一种消息服务把一定的实时通信能力提取到进程之外下面列举一些厂商他们提供的云消息服务有的是独立产品有的则是大型服务套件中的一部分Apigee API 网关与实时消息通信PubNub 实时消息通信与活跃度监控Pusher实时消息通信活跃度监控Kaazing实时消息通信MasheryAPI 网关与实时消息通信Google Google 云消息通信ASP.NET SinglR (Azure 托管的实时消息通信服务)Amazon 简单通知服务无论选择哪种机制我们都应该投入一定的时间让代码与具体的消息服务相隔离从而在更换服务商时不至于产生太大的影响开发位置接近监控服务现在我们要做的就是开发一个每当后端系统检测到接近事件时就能够实时更新的监视器我们可以生成一张地图在上面绘出两个团队成员的位置当系统检测到他们相互接近时就让他们的头像跳动或者生成一个动画这些团队成员的移动设备可能还会在同一时刻收到通知创建接近监控服务我们的示例监控服务将包含一系列不同的组件首先我们需要消费由第 6 章编写的服务生成并放入队列的 ProximityDetectedEvent 事件此后我们要提取事件中的原始信息调用团队服务以获取可供用户读取识别的信息获取这些补充信息后最后要在实时消息系统上发出一条消息GitHub链接https://github.com/microservices-aspnetcore/es-proximitymonitor以下是我们接近监控服务背后的上层协调逻辑using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StatlerWaldorfCorp.ProximityMonitor.Queues;
using StatlerWaldorfCorp.ProximityMonitor.Realtime;
using StatlerWaldorfCorp.ProximityMonitor.TeamService;namespace StatlerWaldorfCorp.ProximityMonitor.Events
{public class ProximityDetectedEventProcessor : IEventProcessor{private ILogger logger;private IRealtimePublisher publisher;private IEventSubscriber subscriber;private PubnubOptions pubnubOptions;public ProximityDetectedEventProcessor(ILoggerProximityDetectedEventProcessor logger,IRealtimePublisher publisher,IEventSubscriber subscriber,ITeamServiceClient teamClient,IOptionsPubnubOptions pubnubOptions){this.logger logger;this.pubnubOptions pubnubOptions.Value;this.publisher publisher;this.subscriber subscriber;logger.LogInformation(Created Proximity Event Processor.);subscriber.ProximityDetectedEventReceived (pde) {Team t teamClient.GetTeam(pde.TeamID);Member sourceMember teamClient.GetMember(pde.TeamID, pde.SourceMemberID);Member targetMember teamClient.GetMember(pde.TeamID, pde.TargetMemberID);ProximityDetectedRealtimeEvent outEvent new ProximityDetectedRealtimeEvent{TargetMemberID pde.TargetMemberID,SourceMemberID pde.SourceMemberID,DetectionTime pde.DetectionTime,SourceMemberLocation pde.SourceMemberLocation,TargetMemberLocation pde.TargetMemberLocation,MemberDistance pde.MemberDistance,TeamID pde.TeamID,TeamName t.Name,SourceMemberName ${sourceMember.FirstName} {sourceMember.LastName},TargetMemberName ${targetMember.FirstName} {targetMember.LastName}};publisher.Publish(this.pubnubOptions.ProximityEventChannel, outEvent.toJson());};}public void Start(){subscriber.Subscribe();}public void Stop(){subscriber.Unsubscribe();}}
}在这个代码清单中首先要注意的是从 DI 向构造函数注入的一连串依赖日志记录工具实时事件发布器事件订阅器团队服务客户端PubNub 选项创建实时事件发布器类实现类using Microsoft.Extensions.Logging;
using PubnubApi;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{public class PubnubRealtimePublisher : IRealtimePublisher{private ILogger logger;private Pubnub pubnubClient;public PubnubRealtimePublisher(ILoggerPubnubRealtimePublisher logger,Pubnub pubnubClient){logger.LogInformation(Realtime Publisher (Pubnub) Created.);this.logger logger;this.pubnubClient pubnubClient;}public void Validate(){pubnubClient.Time().Async(new PNTimeResultExt((result, status) {if (status.Error) {logger.LogError($Unable to connect to Pubnub {status.ErrorData.Information});throw status.ErrorData.Throwable;} else {logger.LogInformation(Pubnub connection established.);}}));}public void Publish(string channelName, string message){pubnubClient.Publish().Channel(channelName).Message(message).Async(new PNPublishResultExt((result, status) {if (status.Error) {logger.LogError($Failed to publish on channel {channelName}: {status.ErrorData.Information});} else {logger.LogInformation($Published message on channel {channelName}, {status.AffectedChannels.Count} affected channels, code: {status.StatusCode});}}));}}
}注入实时通信类在 Startup 类中配置 DI 来提供 PubNub 客户端和其他相关类using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StatlerWaldorfCorp.ProximityMonitor.Queues;
using StatlerWaldorfCorp.ProximityMonitor.Realtime;
using RabbitMQ.Client.Events;
using StatlerWaldorfCorp.ProximityMonitor.Events;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using StatlerWaldorfCorp.ProximityMonitor.TeamService;namespace StatlerWaldorfCorp.ProximityMonitor
{public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile(appsettings.json, optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.ConfigureQueueOptions(Configuration.GetSection(QueueOptions));services.ConfigurePubnubOptions(Configuration.GetSection(PubnubOptions));services.ConfigureTeamServiceOptions(Configuration.GetSection(teamservice));services.ConfigureAMQPOptions(Configuration.GetSection(amqp));services.AddTransient(typeof(IConnectionFactory), typeof(AMQPConnectionFactory));services.AddTransient(typeof(EventingBasicConsumer), typeof(RabbitMQEventingConsumer));services.AddSingleton(typeof(IEventSubscriber), typeof(RabbitMQEventSubscriber));services.AddSingleton(typeof(IEventProcessor), typeof(ProximityDetectedEventProcessor));services.AddTransient(typeof(ITeamServiceClient),typeof(HttpTeamServiceClient));services.AddRealtimeService();services.AddSingleton(typeof(IRealtimePublisher), typeof(PubnubRealtimePublisher));}// Singletons are lazy instantiation.. so if we dont ask for an instance during startup,// theyll never get used.public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,IEventProcessor eventProcessor,IOptionsPubnubOptions pubnubOptions,IRealtimePublisher realtimePublisher){realtimePublisher.Validate();realtimePublisher.Publish(pubnubOptions.Value.StartupChannel, {hello: world});eventProcessor.Start();app.UseMvc();}}
}我们尝试为类提供预先创建好的 PubNub API 实例为整洁地实现这一功能并继续以注入方式获取配置信息包括 API 密钥我们需要向 DI 中注册一个工厂工厂类的职责是向外提供装配完成的 PubNub 实例using System;
using Microsoft.Extensions.Options;
using PubnubApi;
using System.Linq;
using Microsoft.Extensions.Logging;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{public class PubnubFactory{private PNConfiguration pnConfiguration;private ILogger logger;public PubnubFactory(IOptionsPubnubOptions pubnubOptions,ILoggerPubnubFactory logger){this.logger logger;pnConfiguration new PNConfiguration();pnConfiguration.PublishKey pubnubOptions.Value.PublishKey;pnConfiguration.SubscribeKey pubnubOptions.Value.SubscribeKey;pnConfiguration.Secure false;logger.LogInformation($Pubnub Factory using publish key {pnConfiguration.PublishKey});}public Pubnub CreateInstance(){return new Pubnub(pnConfiguration);}}
}将工厂注册到 DI 时使用的扩展方法机制using System;
using Microsoft.Extensions.DependencyInjection;
using PubnubApi;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{public static class RealtimeServiceCollectionExtensions{public static IServiceCollection AddRealtimeService(this IServiceCollection services){services.AddTransientPubnubFactory();return AddInternal(services, p p.GetRequiredServicePubnubFactory(), ServiceLifetime.Singleton);}private static IServiceCollection AddInternal(this IServiceCollection collection,FuncIServiceProvider, PubnubFactory factoryProvider,ServiceLifetime lifetime){FuncIServiceProvider, object factoryFunc provider {var factory factoryProvider(provider);return factory.CreateInstance();};var descriptor new ServiceDescriptor(typeof(Pubnub), factoryFunc, lifetime);collection.Add(descriptor);return collection;}}
}上面代码的关键功能是创建了一个 lambda 函数接收 IServiceProvider 作为输入并返回一个对象作为输出它正是我们注册工厂时向服务描述对象中传入的工厂方法汇总所有设计要立即查看效果从而确保一切工作正常我们可模拟由第 6 章的服务输出的信息只需要手动向 proximitydetected 队列中放入表示 ProximityDetectedEvent 对象的 JSON 字符串在这个过程中如果我们的监控服务处于运行之中、订阅了队列而且团队服务处于运行之中、拥有正确的数据那么接近监控服务将取出事件、补充必要的数据并通过 PubNub 发送一个实时事件利用 PubNub 调试控制台我们可以立即看到这一处理过程生成的输出为实时接近监控服务创建界面为简化工作同时掩盖我缺乏艺术细胞的真相我将用一个不包含图形元素的简单 HTML 页面它不需要托管在专门的 Web 服务器上它实时地监听接近事件并将携带的信息动态添加到新的 div 元素中realtimetest.htmlhtmlheadtitleRT page sample/titlescript srchttps://cdn.pubnub.com/sdk/javascript/pubnub.4.4.0.js/scriptscriptvar pubnub new PubNub({subscribeKey: yoursubkey,publishKey: yourprivatekey,ssl: true});pubnub.addListener({message: function(m) {// handle messagevar channelName m.channel; // The channel for which the message belongsvar channelGroup m.subscription; // The channel group or wildcard subscription match (if exists)var pubTT m.timetoken; // Publish timetokenvar msg JSON.parse(m.message); // The Payloadconsole.log(New Message!!, msg);var newDiv document.createElement(div)var newStr ** ( msg.TeamName ) msg.SourceMemberName moved within msg.MemberDistance km of msg.TargetMemberName;newDiv.innerHTML newStrvar oldDiv document.getElementById(chatLog)oldDiv.appendChild(newDiv)},presence: function(p) {// handle presencevar action p.action; // Can be join, leave, state-change or timeoutvar channelName p.channel; // The channel for which the message belongsvar occupancy p.occupancy; // No. of users connected with the channelvar state p.state; // User Statevar channelGroup p.subscription; // The channel group or wildcard subscription match (if exists)var publishTime p.timestamp; // Publish timetokenvar timetoken p.timetoken; // Current timetokenvar uuid p.uuid; // UUIDs of users who are connected with the channel},status: function(s) {// handle status}});console.log(Subscribing..);pubnub.subscribe({channels: [proximityevents]});/script/headbodyh1Proximity Monitor/h1pProximity Events listed below./pdiv idchatLog/div/body
/html值得指出的是这个文件并不需要托管在服务器上在任何浏览器中打开其中的 JavaScript 都可以运行