电子商务网站建设是什么,东莞市土木建筑学会,wordpress购物插件下载,369网站建设中心背景信息
目标
本文主要介绍如何让阿里云日志服务与您的SIEM方案(如Splunk)对接, 以便确保阿里云上的所有法规、审计、与其他相关日志能够导入到您的安全运维中心#xff08;SOC#xff09;中。
名词解释
LOG#xff08;SLS#xff09; - 阿里云日志服务#xff0c;简…背景信息
目标
本文主要介绍如何让阿里云日志服务与您的SIEM方案(如Splunk)对接, 以便确保阿里云上的所有法规、审计、与其他相关日志能够导入到您的安全运维中心SOC中。
名词解释
LOGSLS - 阿里云日志服务简写SLS表示Simple Log Service。SIEM - 安全信息与事件管理系统Security Information and Event Management,如Splunk, QRadar等。Splunk HEC - Splunk的Http事件接收器Splunk Http Event Collector, 一个 HTTP(s)接口用于接收日志。
审计相关日志
安全运维团队一般对阿里云相关的审计日志感兴趣如下列出所有存在于所有目前在日志服务中可用的相关日志但不限于
Regions化 - 时刻更新请以最新的产品文档为准。
阿里云日志服务
阿里云的日志服务log service是针对日志类数据的一站式服务无需开发就能快捷完成海量日志数据的采集、消费、投递以及查询分析等功能提升运维、运营效率。日志服务主要包括 实时采集与消费、数据投递、查询与实时分析 等功能适用于从实时监控到数据仓库的各种开发、运维、运营与安全场景 目前以上各个阿里云产品已经与日志服务打通提供近实时的日志自动采集存储、并提供基于日志服务的查询分析、报表报警、下游计算对接与投递的能力。 集成方案建议
概念
项目Project 项目Project是日志服务中的资源管理单元用于资源隔离和控制。您可以通过项目来管理某一个应用的所有日志及相关的日志源。它管理着用户的所有日志库Logstore采集日志的机器配置等信息同时它也是用户访问日志服务资源的入口。
日志库Logstore 日志库Logstore是日志服务中日志数据的收集、存储和查询单元。每个日志库隶属于一个项目且每个项目可以创建多个日志库。
分区Shard 每个日志库分若干个分区Shard每个分区由MD5左闭右开区间组成每个区间范围不会相互覆盖并且所有的区间的范围是MD5整个取值范围。
服务入口Endpoint 日志服务入口是访问一个项目Project及其内部日志数据的 URL。它和 Project 所在的阿里云区域Region及 Project 名称相关。
访问秘钥AccessKey 阿里云访问秘钥是阿里云为用户使用 API非控制台来访问其云资源设计的“安全口令”。您可以用它来签名 API 请求内容以通过服务端的安全验证。
假设
这里假设您的SIEM如Splunk位于组织内部环境on-premise中而不是云端。为了安全考虑没有任何端口开放让外界环境来访问此SIEM。
概览
推荐使用SLS消费组构建程序来从SLS进行实时消费然后通过Splunk APIHEC来发送日志给Splunk。 使用消费组编程
协同消费库Consumer Library是对日志服务中日志进行消费的高级模式提供了消费组ConsumerGroup的概念对消费端进行抽象和管理和直接使用SDK进行数据读取的区别在于用户无需关心日志服务的实现细节只需要专注于业务逻辑另外消费者之间的负载均衡、failover等用户也都无需关心。
Spark Streaming、Storm 以及Flink Connector都以Consumer Library作为基础实现。
基本概念
消费组Consumer Group - 一个消费组由多个消费者构成同一个消费组下面的消费者共同消费一个logstore中的数据消费者之间不会重复消费数据。消费者Consumer - 消费组的构成单元实际承担消费任务同一个消费组下面的消费者名称必须不同。
在日志服务中一个logstore下面会有多个shard协同消费库的功能就是将shard分配给一个消费组下面的消费者分配方式遵循以下原则
每个shard只会分配到一个消费者。一个消费者可以同时拥有多个shard。 新的消费者加入一个消费组这个消费组下面的shard从属关系会调整以达到消费负载均衡的目的但是上面的分配原则不会变分配过程对用户透明。
协同消费库的另一个功能是保存checkpoint方便程序故障恢复时能接着从断点继续消费从而保证数据不会被重复消费。
部署建议
硬件建议
硬件参数 需要一台机器运行程序安装一个Linux如Ubuntu x64推荐硬件参数如下
2.0 GHZ X 8核16GB 内存推荐32GB1 Gbps网卡至少2GB可用磁盘空间建议10GB以上
网络参数 从组织内的环境到阿里云的带宽应该大于数据在阿里云端产生的速度否则日志无法实时消费。假设数据产生一般速度均匀峰值在2倍左右每天100TB原始日志。5倍压缩的场景下推荐带宽应该在4MB/s32Mbps左右。
使用(Python)
这里我们描述用Python使用消费组进行编程。对于Java语言用法可以参考这篇文章.
注意本篇文章的代码可能会更新最新版本在这里可以找到Github样例.
安装
环境
强烈推荐PyPy3来运行本程序而不是使用标准CPython解释器。日志服务的Python SDK可以如下安装
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手册可以参考这里
程序配置
如下展示如何配置程序
配置程序日志文件以便后续测试或者诊断可能的问题。基本的日志服务连接与消费组的配置选项。消费组的一些高级选项性能调参不推荐修改。SIEMSplunk的相关参数与选项。
请仔细阅读代码中相关注释并根据需要调整选项
#encoding: utf8
import os
import logging
from logging.handlers import RotatingFileHandlerroot logging.getLogger()
handler RotatingFileHandler({0}_{1}.log.format(os.path.basename(__file__), current_process().pid), maxBytes100*1024*1024, backupCount5)
handler.setFormatter(logging.Formatter(fmt[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s, datefmt%Y-%m-%d %H:%M:%S))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())logger logging.getLogger(__name__)def get_option():########################### 基本选项########################### 从环境变量中加载SLS参数与选项endpoint os.environ.get(SLS_ENDPOINT, )accessKeyId os.environ.get(SLS_AK_ID, )accessKey os.environ.get(SLS_AK_KEY, )project os.environ.get(SLS_PROJECT, )logstore os.environ.get(SLS_LOGSTORE, )consumer_group os.environ.get(SLS_CG, )# 消费的起点。这个参数在第一次跑程序的时候有效后续再次运行将从上一次消费的保存点继续。# 可以使”begin“”end“或者特定的ISO时间格式。cursor_start_time 2018-12-26 0:0:0########################### 一些高级选项########################### 一般不要修改消费者名尤其是需要并发跑时consumer_name {0}-{1}.format(consumer_group, current_process().pid)# 心跳时长当服务器在2倍时间内没有收到特定Shard的心跳报告时服务器会认为对应消费者离线并重新调配任务。# 所以当网络不是特别好的时候不要调整的特别小。heartbeat_interval 20# 消费数据的最大间隔如果数据生成的速度很快并不需要调整这个参数。data_fetch_interval 1# 构建一个消费组和消费者option LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,cursor_positionCursorPosition.SPECIAL_TIMER_CURSOR,cursor_start_timecursor_start_time,heartbeat_intervalheartbeat_interval,data_fetch_intervaldata_fetch_interval)# Splunk选项settings {host: 10.1.2.3,port: 80,token: a023nsdu123123123,https: False, # 可选, booltimeout: 120, # 可选, intssl_verify: True, # 可选, boolsourcetype: , # 可选, sourcetypeindex: , # 可选, indexsource: , # 可选, source}return option, settings
数据消费与转发
如下代码展示如何从SLS拿到数据后转发给Splunk。
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requestsclass SyncData(ConsumerProcessorBase):这个消费者从SLS消费数据并发送给Splunkdef __init__(self, splunk_setting):初始化并验证Splunk连通性super(SyncData, self).__init__() assert splunk_setting, ValueError(You need to configure settings of remote target)assert isinstance(splunk_setting, dict), ValueError(The settings should be dict to include necessary address and confidentials.)self.option splunk_settingself.timeout self.option.get(timeout, 120)# 测试Splunk连通性s socket.socket()s.settimeout(self.timeout)s.connect((self.option[host], self.option[port]))self.r requests.session()self.r.max_redirects 1self.r.verify self.option.get(ssl_verify, True)self.r.headers[Authorization] Splunk {}.format(self.option[token])self.url {0}://{1}:{2}/services/collector/event.format(http if not self.option.get(https) else https, self.option[host], self.option[port])self.default_fields {}if self.option.get(sourcetype):self.default_fields[sourcetype] self.option.get(sourcetype)if self.option.get(source):self.default_fields[source] self.option.get(source)if self.option.get(index):self.default_fields[index] self.option.get(index)def process(self, log_groups, check_point_tracker):logs PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_strTrue, decode_bytesTrue)logger.info(Get data from shard {0}, log count: {1}.format(self.shard_id, len(logs)))for log in logs:# 发送数据到Splunk# 如下代码只是一个样例注意所有字符串都是unicode# Python2: {u__time__: u12312312, u__topic__: utopic, ufield1: uvalue1, ufield2: uvalue2}# Python3: {__time__: 12312312, __topic__: topic, field1: value1, field2: value2}event {}event.update(self.default_fields)if log.get(u__topic__) audit_log:# suppose we only care about audit logevent[time] log[u__time__]event[fields] {}del log[__time__]event[fields].update(log)data json.dumps(event, sort_keysTrue)try:req self.r.post(self.url, datadata, timeoutself.timeout)req.raise_for_status()except Exception as err:logger.debug(Failed to connect to remote Splunk server ({0}). Exception: {1}, self.url, err)# TODO: 根据需要添加一些重试或者报告的逻辑logger.info(Complete send data to remote)self.save_checkpoint(check_point_tracker)主逻辑
如下代码展示主程序控制逻辑
def main():option, settings get_monitor_option()logger.info(*** start to consume data...)worker ConsumerWorker(SyncData, option, args(settings,) )worker.start(joinTrue)if __name__ __main__:main()
启动
假设程序命名为sync_data.py可以如下启动
export SLS_ENDPOINTEndpoint of your region
export SLS_AK_IDYOUR AK ID
export SLS_AK_KEYYOUR AK KEY
export SLS_PROJECTSLS Project Name
export SLS_LOGSTORESLS Logstore Name
export SLS_CG消费组名可以简单命名为syc_datapypy3 sync_data.py
限制与约束 每一个日志库logstore最多可以配置10个消费组如果遇到错误ConsumerGroupQuotaExceed则表示遇到限制建议在控制台端删除一些不用的消费组。
监测
在控制台查看消费组状态通过云监控查看消费组延迟并配置报警
性能考虑
启动多个消费者
基于消费组的程序可以直接启动多次以便达到并发作用
nohup pypy3 sync_data.py
nohup pypy3 sync_data.py
nohup pypy3 sync_data.py
...
注意: 所有消费者使用了同一个消费组的名字和不同的消费者名字因为消费者名以进程ID为后缀。 因为一个分区Shard只能被一个消费者消费假设一个日志库有10个分区那么最多有10个消费者同时消费。
Https
如果服务入口endpoint配置为https://前缀如https://cn-beijing.log.aliyuncs.com程序与SLS的连接将自动使用HTTPS加密。
服务器证书*.aliyuncs.com是GlobalSign签发默认大多数Linux/Windows的机器会自动信任此证书。如果某些特殊情况机器不信任此证书可以参考这里下载并安装此证书。
性能吞吐
基于测试在没有带宽限制、接收端速率限制如Splunk端的情况下以推进硬件用pypy3运行上述样例单个消费者占用大约10%的单核CPU下可以消费达到5 MB/s原始日志的速率。因此理论上可以达到50 MB/s原始日志每个CPU核也就是每个CPU核每天可以消费4TB原始日志。
注意: 这个数据依赖带宽、硬件参数和SIEM接收端如Splunk是否能够较快接收数据。
高可用性
消费组会将检测点check-point保存在服务器端当一个消费者停止另外一个消费者将自动接管并从断点继续消费。
可以在不同机器上启动消费者这样当一台机器停止或者损坏的清下其他机器上的消费者可以自动接管并从断点进行消费。
理论上为了备用也可以启动大于shard数量的消费者。
原文链接 本文为云栖社区原创内容未经允许不得转载。