个人网站开发与设计摘要,wordpress推特主题,wordpress+分页静态,做公司网站需要准备什么科目【.NET Core】| 总结/Edison Zhou大家好#xff0c;我是Edison。最近有一个ASP.NET Core使用认证机制访问Kafka的需求#xff0c;加之我们又使用了CAP这个开源项目使用的Kafka#xff0c;于是网上寻找了一番发现对应资料太少#xff0c;于是调查了一番#xff0c;做了如下…【.NET Core】| 总结/Edison Zhou大家好我是Edison。最近有一个ASP.NET Core使用认证机制访问Kafka的需求加之我们又使用了CAP这个开源项目使用的Kafka于是网上寻找了一番发现对应资料太少于是调查了一番做了如下的笔记希望对你有用。背景在实际场景中开发环境的Kafka服务器一般没有要求通过认证即可发布和读取消息并且还可以随意创建Topic和Consumer Group。但是在生产环境中则一般有较强的安全需求无法随意创建Topic和Consumer还做了一些认证和权限约束。而在ASP.NET Core的解决方案中我们经常使用到CAP这个开源项目作为事件总线在CAP.Kafka项目中只提供了最基础的Servers配置文档示例中也只给出了这种只适合开发环境的配置示例而对于对安全要求较高的生产环境则需要我们研究一下Kafka官方的配置文档在CAP.Kafka的MainConfig对象中进行主动配置Key/Value。本文会首先介绍一下Kafka的认证机制然后会给出基于CAP项目通过认证方式访问Kafka的示例。Kafka认证机制自 0.9.0.0 版本开始Kafka 正式引入了认证机制用于实现基础的安全用户认证这是将 Kafka 上云或进行多租户管理的必要步骤。目前Kafka的版本已支持基于 SSL 和 基于 SASL 的安全认证机制。基于 SSL 的认证主要是指 Broker 和客户端的双路认证2-way authentication。通常来说SSL 加密Encryption已经启用了单向认证即客户端认证 Broker 的证书Certificate。如果要做 SSL 认证那么我们要启用双路认证也就是说 Broker 也要认证客户端的证书。NoteKafka 的源码中依然是使用 SSL 而不是 TLS 来表示这类东西的。不过今天出现的所有 SSL 字眼我们都可以认为它们是和 TLS 等价的。Kafka 还支持通过 SASL 做客户端认证。SASL 是提供认证和数据安全服务的框架。Kafka 支持的 SASL 机制有 5 种GSSAPI也就是 Kerberos 使用的安全接口是在 0.9 版本中被引入的。PLAIN是使用简单的用户名 / 密码认证的机制在 0.10 版本中被引入。SCRAM主要用于解决 PLAIN 机制安全问题的新机制是在 0.10.2 版本中被引入的。OAUTHBEARER是基于 OAuth 2 认证框架的新机制在 2.0 版本中被引进。Delegation Token补充现有 SASL 机制的轻量级认证机制是在 1.1.0 版本被引入的。在实际应用中一般建议 使用 SSL 来做通信加密使用 SASL 来做 Kafka 的认证实现。对于小型公司来说SASL/PLAIN 的配置和运维成本相对较小比较适合Kafka集群配置。下图将这些认证机制进行了汇总源自极客时间胡夕《Kafka核心技术与实战》。通过认证机制使用Kafka这里假设我们已经搭建好了一个Kafka集群并且配置了SASL/PLAIN方式并且创建了一个账号“kafka_user”密码为kakfa_user_password2022abcdlk!约束客户端只能通过SSL方式带上CA证书加密访问。假设我们已经有了一个ASP.NET Core应用并且之前已经在开发环境通过CAP项目使用了Kafka那么对于生产环境或安全要求较高的测试环境我们应该如何修改呢通过查看CAP的文档在CAP.Kafka中其实只提供了几个最基础的配置项而其他的配置项我们只能通过CAP.Kafka提供的MainConfig这个Dictionary类进行手动添加如下所示services.AddCap(capOptions
{capOptions.UseKafka(kafkaOption{// kafka options.// kafkaOptions.MainConfig.Add(, );});
});那么我们应该添加哪些配置呢它们的key和可选的value又是哪些呢CAP给出了一个参考链接https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md它是librdkafka项目的配置参数的文档。通过研究配置项文档我们大概需要以下一些参数将其添加到MainConfig字典中这些参数不仅适配Producer也适配Consumer。namespace Microsoft.Extensions.DependencyInjection
{public static class ApplicationServiceCollectionExtensions{public static IServiceCollection AddEventBus(this IServiceCollection services, IConfigurationSection configuration){services.AddCap(option {option.UseInMemoryStorage();option.UseKafka(kfkOption {kfkOption.Servers configuration[KafkaBootstrapServers];if (Convert.ToBoolean(configuration[EnableAuthorization])){kfkOption.MainConfig.Add(security.protocol, sasl_ssl);kfkOption.MainConfig.Add(sasl.mechanism, PLAIN);kfkOption.MainConfig.Add(sasl.username, configuration[SaslUserName]); kfkOption.MainConfig.Add(sasl.password, configuration[SaslPassword]); kfkOption.MainConfig.Add(ssl.ca.location, configuration[SslCertificatePath]); kfkOption.MainConfig.Add(enable.ssl.certificate.verification, configuration[EnableSslCertificateVerification]); }});option.SucceedMessageExpiredAfter 3600 * 24 * Convert.ToInt32(configuration[SuccessMsgExpireDays]);});return services;}}
}在Program.cs中调用AddEventBus方法builder.Services.AddEventBus(builder.Configuration.GetSection(EventBusConfigs));在appsettings中的配置如下{EventBusConfigs: {KafkaBootstrapServers: prd.kafka01.com:9093, prd.kafka02.com:9093, prd.kafka03.com:9093,SuccessMsgExpireDays: 7,EnableAuthorization: true,SaslUserName: kafka_user,SaslPassword: kakfa_user_password2022abcdlk!,SslCertificatePath: resources/certificates/intranet_server_ca.cer,EnableSslCertificateVerification: true}
}既然是通过证书访问那么我们得告诉ASP.NET Core这个证书放在什么位置本文示例是放在这个ASP.NET Core应用目录下的在实际中建议由运维管理员统一放在一个中心服务器位置挂载到容器内部可以访问从而保证证书的安全。如果是通过K8s部署那么将其添加为一个Secret存放是更好的方式。CAP中的异构系统集成顺带说一下在CAP这个项目中如果你的项目都是基于它来做事件总线那么CAP可以正常的Publish和Subscribe消息但是如果在你使用它之前已经有了许多的Topic Messages它需要和一些第三方系统进行消息传输这就会涉及到异构系统的集成。如果我们不做一些配置CAP是无法正常Subscribe和Consume消息的。因此在CAP中我们需要主动对Message做一些改造添加传递一些额外信息以便于CAP能够在收到消息时提取到关键特征从而正常运作。否则你会在启动时收到这样一个错误The given key cap-msg-id is not existed........。我们只需要在注册CAP组件时添加自定义Headers确保cap-msg-id和cap-msg-name两个Header值能够被解析到namespace Microsoft.Extensions.DependencyInjection
{public static class ApplicationServiceCollectionExtensions{public static IServiceCollection AddEventBus(this IServiceCollection services, IConfigurationSection configuration){services.AddCap(option {option.UseInMemoryStorage();option.UseKafka(kfkOption {kfkOption.Servers configuration[KafkaBootstrapServers];if (Convert.ToBoolean(configuration[EnableAuthorization])){kfkOption.MainConfig.Add(security.protocol, sasl_ssl);kfkOption.MainConfig.Add(sasl.mechanism, PLAIN);kfkOption.MainConfig.Add(sasl.username, configuration[SaslUserName]); kfkOption.MainConfig.Add(sasl.password, configuration[SaslPassword]); kfkOption.MainConfig.Add(ssl.ca.location, configuration[SslCertificatePath]); kfkOption.MainConfig.Add(enable.ssl.certificate.verification, configuration[EnableSslCertificateVerification]); }// 以下为新增自定义Headers配置option.CustomHeaders e new ListKeyValuePairstring, string{new KeyValuePairstring, string(DotNetCore.CAP.Messages.Headers.MessageId, SnowflakeId.Default().NextId().ToString()),new KeyValuePairstring, string(DotNetCore.CAP.Messages.Headers.MessageName, e.Topic)};});option.SucceedMessageExpiredAfter 3600 * 24 * Convert.ToInt32(configuration[SuccessMsgExpireDays]);});return services;}}
}小结本文介绍了在ASP.NET Core中使用CAP项目通过认证机制安全地使用kafka消息中间件希望能够对你有所帮助参考资料CAP文档Kafka部分https://cap.dotnetcore.xyz/user-guide/en/transport/kafkalibrdkafka配置项文档https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdCAP文档Messaging部分https://cap.dotnetcore.xyz/user-guide/en/cap/messaging胡夕《Kafka核心技术与实战》之Kafka认证机制用哪家https://time.geekbang.org/column/article/118347年终总结Edison的2021年终总结数字化转型我在传统企业做数字化转型C#刷题C#刷剑指Offer算法题系列文章目录.NET面试.NET开发面试知识体系.NET大会2020年中国.NET开发者大会PDF资料