建设部精神文明建设网站,wordpress连接到微博,淘宝客建站教程,湖南互联网公司主要业务流程
初始请求请求过滤器请求队列响应下载器数据解析器数据清洗器存储器
设计图 master slave#xff1a;master控制队列#xff0c;过滤#xff0c;传递任务#xff1b;slave负责执行 缺点#xff1a;master和slave端交互数据频繁#xff0c;slave的数据进出…主要业务流程
初始请求请求过滤器请求队列响应下载器数据解析器数据清洗器存储器
设计图 master slavemaster控制队列过滤传递任务slave负责执行 缺点master和slave端交互数据频繁slave的数据进出都给master去调度对master端相当于成倍数据并发比较大 升级策略2分离响应异步下载与异步处理避免一方阻塞另一方 如果解析和清洗也会处理很长时间并发量就会下降也可以在中间加入队列解耦任务 如果没有耗时操作也没必要新加一个队列来做具体在哪些环节之间加入队列取决你分析业务需求在哪些环节会出现耗时操作 对于一个任务来说是共享一个进程的这个队列可以直接用Queue内存队列共享一个进程中数据 升级策略3日志监控捕获错误并实时通报。ELK 先对日志进行埋点针对Error错误日志进行报告 还有一种master 只负责过滤重复请求slave自己负责维护自己的队列只需要 slave 执行任务前询问 master是否有重复值即可 减轻了master的负担但是slave自己维护自己队列彼此独立
系统架构组件 队列组件 队列类型 FIFO 内存队列 - 一般实现单机版的队列 Python内置队列Asyncio中的队列 持久化队列:分布式断点续爬 Redis队列消息队列KafkaRabbitmq 过滤器组件 指纹过滤器redis等: 千万级数据去重simhash过滤器相似文本去重布隆过滤器redis亿级数据去重存在极小概率误判占的空间比较小性能高 下载器组件 urllib/requestsaiohttptomada.httpclient 异步组件 asynciocelery eventlet/geventselenium chrome-headless Pool多个浏览器实例appium android-app Pool (多台设备) 数据解析提取组件 语法规则 正则Xpath 解析提取工具 relxmllxml bs4lxml pyquery 数据清洗组件 自定义清洗规则 数据存储组件 存储介质 file:csv/jsonDB:mysql/mongondb 存储工具 csv、jsonsqlalchemy/mongoengine 程序监控组件 ELK elasticsearch:日志数据存储logstash: 日志收集工具kibana: 日志可视化 可视化控制组件 web界面GUI界面
异步改造并发代码
同步请求
下载器中开始使用的是 requests 同步发请求没有异步
下载器同步请求
import requests
from spiderSystem.response import Responseclass RequestsDownloader(object):根据request发起请求构建response对象def fetch(self, request):if request.method.upper() GET:resp requests.get(request.with_query_url, headersrequest.headers)elif request.method.upper() POST:resp requests.post(request.with_query_url, headersrequest.headers, bodyrequest.body)else:raise Exception(only support GET or POST Method)return Response(request, status_coderesp.status_code, urlresp.url, headersresp.headers, bodyresp.content)
请求的 Slave 客户端
from .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloaderfrom .request import RequestFIFO_QUEUE get_redis_queue_cls(fifo)class Slave(object):def __init__(self, spiders, project_name, request_manager_config):self.filter_queue FIFO_QUEUE(filter_queue, hostlocalhost)self.request_manager RequestManager(**request_manager_config) self.downloader RequestsDownloader() # 用 requests 同步请求的下载器self.spiders spidersself.project_name project_namedef handle_request(self):# 1. 获取一个请求request self.request_manager.get_request(self.project_name)# 2. 发起请求response self.downloader.fetch(request) # 每次都同步去请求 # 3. 获取爬虫对象spider self.spiders[request.name]()# 4. 处理 responsefor result in spider.parse(response):if result is None:raise Exception(不允许返回None)elif isinstance(result, Request):self.filter_queue.put(result)else:# 意味着是一个数据new_result spider.data_clean(result)spider.data_save(new_result)def run(self):while True:self.handle_request()
异步请求改造
通过 tornado 的异步请求
下载器(异步)
from tornado.httpclient import HTTPClient, HTTPRequest, AsyncHTTPClientfrom spiderSystem.response import Response# tornado 也有同步请求方式 可以忽略
class TornadoDownloader(object):def __init__(self):self.httpclient HTTPClient()def fetch(self, request):print(tornado 同步客户端发的请求)tornado_request HTTPRequest(request.with_query_url, methodrequest.method.upper(), headersrequest.headers)tornado_response self.httpclient.fetch(tornado_request)return Response(requestrequest, status_codetornado_response.code, urltornado_response.effective_url,bodytornado_response.buffer.read())同步的请求不能复用需要用完后关闭def __del__(self):self.httpclient.close()# tornado 也有异步请求方式
class AsyncTornadoDownloader(object):def __init__(self):self.async_http_client AsyncHTTPClient()async def fetch(self, request): # 开启协程print(tornado 异步客户端发的请求)tornado_request HTTPRequest(request.with_query_url, methodrequest.method.upper(), headersrequest.headers)tornado_response await self.async_http_client.fetch(tornado_request) # 等待return Response(requestrequest, status_codetornado_response.code, urltornado_response.effective_url,headersrequest.headers,bodytornado_response.buffer.read())
Slave 调用方
import asyncio
import tornado.ioloopfrom .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloader, TornadoDownloader, AsyncTornadoDownloaderfrom .request import RequestFIFO_QUEUE get_redis_queue_cls(fifo)class Slave(object):def __init__(self, spiders, project_name, request_manager_config):self.filter_queue FIFO_QUEUE(filter_queue, hostlocalhost)self.request_manager RequestManager(**request_manager_config) self.downloader AsyncTornadoDownloader() # 异步下载器self.spiders spidersself.project_name project_nameasync def handle_request(self):# request self.request_manager.get_request(self.project_name) 阻塞改异步io_loop tornado.ioloop.IOLoop.current()# 1. 获取一个请求future io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) # 不支持协程的函数可以自己获取事件循环去定义执行让其支持协程request await future# 2. 发起请求response await self.downloader.fetch(request)# 3. 获取爬虫对象spider self.spiders[request.name]()# 4. 处理 responsefor result in spider.parse(response):if result is None:raise Exception(不允许返回None)elif isinstance(result, Request):# self.filter_queue.put(result) 可能阻塞改异步await io_loop.run_in_executor(None, self.filter_queue.put,result) else:# 意味着是一个数据new_result spider.data_clean(result)spider.data_save(new_result)async def run(self):while True:# 不能写成 await self.handle_request()否则也是相当于同步请求了await asyncio.wait([self.handle_request(),self.handle_request(),])
启动方式
if __name__ __main__:spiders {BaiduSpider.name: BaiduSpider}# 同步请求用 requests 发请求# Slave(spiders, project_namePROJECT_NAME, request_manager_configREQUEST_MANAGER_CONFIG).run()# 要用异步方式去请求slave Slave(spiders, project_namePROJECT_NAME, request_manager_configREQUEST_MANAGER_CONFIG)io_loop tornado.ioloop.IOLoop.current()io_loop.run_sync(slave.run) tornado库 io_loop.run_sync 用于将阻塞函数转换为同步函数并在 IOLoop 上执行它会阻塞当前协程。 io_loop.run_in_executor 用于在指定的线程池中异步执行耗时的、阻塞的操作不会阻塞当前协程并允许 IOLoop 继续处理其他事件。 asyncio库 实现类似于 run_sync 的效果您可以使用 loop.run_until_complete 方法来运行一个协程并等待其完成。这个方法会阻塞当前线程直到协程执行完毕 实现类似于 run_in_executor 的效果您可以使用 loop.run_in_executor 方法将耗时的、阻塞的操作转移到一个线程池中执行以避免阻塞事件循环。 async 异步协程改造重点!!! 下载器中用到的所有异步的地方必须是协程 async 定义await 后面跟着的一定是支持协程的方法要不是一个 协程对象future 或者 task 对象比如 self.async_http_client.fetch 如果不支持协程会报错连带着的所有调用 async 的方法也必须是协程函数对于不支持协程的函数可以自己获取事件循环去定义执行让其支持协程如果一个函数是一个协程函数后如果这个协程函数中有任意可以阻塞的或耗时操作都应该改成异步的 await ,不然可能会阻塞整个线程 # self.request_manager.get_request 本身不支持异步或者改造成异步嵌套要改的太深可以用 io_loop.run_in_executor 来替代io_loop tornado.ioloop.IOLoop.current()# 1. 获取一个请求future io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) request await future 在最开始调用的地方比如 run 启动的方式必须是 用 asyncio.wait 或用其他方式启动asyncio.gather 或 asyncio.as_completed # 开启2个协程去执行。asyncio.wait 能让其变成一个异步关系
async def run(self):while True:# 不能写成 await self.handle_request()否则也是相当于同步请求了await asyncio.wait([self.handle_request(),self.handle_request(),])
Master 进程用多线程改造
master 的启动方法这两个可以用两个线程去做不然以前的写法是同步的执行方式 def run(self):# self.run_start_requests()# self.run_filter_queue()# 两个线程去做threading.Thread(targetself.run_start_requests).start()threading.Thread(targetself.run_filter_queue).start()
自己封装的SpiderSystem模块安装成内置环境中
在模块目录添加 setup.py 脚本
├── setup.py
├── spiderSystem
├── README.md
执行 pip3 setup.py install 即可查看包信息 pip3 show spiderSystem
from setuptools import setup, find_packagessetup(namespiderSystem,version0.1,descriptionspiderSystem module,authorraoju,urlurl,licenselicense,packagesfind_packages(exclude[]), # 当前所有模块都安装install_requires[tornado 5.1,pycurl,])
.markdown-body pre,.markdown-body precode.hljs{color:#333;background:#f8f8f8}.hljs-comment,.hljs-quote{color:#998;font-style:italic}.hljs-keyword,.hljs-selector-tag,.hljs-subst{color:#333;font-weight:700}.hljs-literal,.hljs-number,.hljs-tag .hljs-attr,.hljs-template-variable,.hljs-variable{color:teal}.hljs-doctag,.hljs-string{color:#d14}.hljs-section,.hljs-selector-id,.hljs-title{color:#900;font-weight:700}.hljs-subst{font-weight:400}.hljs-class .hljs-title,.hljs-type{color:#458;font-weight:700}.hljs-attribute,.hljs-name,.hljs-tag{color:navy;font-weight:400}.hljs-link,.hljs-regexp{color:#009926}.hljs-bullet,.hljs-symbol{color:#990073}.hljs-built_in,.hljs-builtin-name{color:#0086b3}.hljs-meta{color:#999;font-weight:700}.hljs-deletion{background:#fdd}.hljs-addition{background:#dfd}.hljs-emphasis{font-style:italic}.hljs-strong{font-weight:700}
主要业务流程
初始请求请求过滤器请求队列响应下载器数据解析器数据清洗器存储器
设计图 master slavemaster控制队列过滤传递任务slave负责执行 缺点master和slave端交互数据频繁slave的数据进出都给master去调度对master端相当于成倍数据并发比较大 升级策略2分离响应异步下载与异步处理避免一方阻塞另一方 如果解析和清洗也会处理很长时间并发量就会下降也可以在中间加入队列解耦任务 如果没有耗时操作也没必要新加一个队列来做具体在哪些环节之间加入队列取决你分析业务需求在哪些环节会出现耗时操作 对于一个任务来说是共享一个进程的这个队列可以直接用Queue内存队列共享一个进程中数据 升级策略3日志监控捕获错误并实时通报。ELK 先对日志进行埋点针对Error错误日志进行报告 还有一种master 只负责过滤重复请求slave自己负责维护自己的队列只需要 slave 执行任务前询问 master是否有重复值即可 减轻了master的负担但是slave自己维护自己队列彼此独立
系统架构组件 队列组件 队列类型 FIFO 内存队列 - 一般实现单机版的队列 Python内置队列Asyncio中的队列 持久化队列:分布式断点续爬 Redis队列消息队列KafkaRabbitmq 过滤器组件 指纹过滤器redis等: 千万级数据去重simhash过滤器相似文本去重布隆过滤器redis亿级数据去重存在极小概率误判占的空间比较小性能高 下载器组件 urllib/requestsaiohttptomada.httpclient 异步组件 asynciocelery eventlet/geventselenium chrome-headless Pool多个浏览器实例appium android-app Pool (多台设备) 数据解析提取组件 语法规则 正则Xpath 解析提取工具 relxmllxml bs4lxml pyquery 数据清洗组件 自定义清洗规则 数据存储组件 存储介质 file:csv/jsonDB:mysql/mongondb 存储工具 csv、jsonsqlalchemy/mongoengine 程序监控组件 ELK elasticsearch:日志数据存储logstash: 日志收集工具kibana: 日志可视化 可视化控制组件 web界面GUI界面
异步改造并发代码
同步请求
下载器中开始使用的是 requests 同步发请求没有异步
下载器同步请求
import requests
from spiderSystem.response import Responseclass RequestsDownloader(object):根据request发起请求构建response对象def fetch(self, request):if request.method.upper() GET:resp requests.get(request.with_query_url, headersrequest.headers)elif request.method.upper() POST:resp requests.post(request.with_query_url, headersrequest.headers, bodyrequest.body)else:raise Exception(only support GET or POST Method)return Response(request, status_coderesp.status_code, urlresp.url, headersresp.headers, bodyresp.content)
请求的 Slave 客户端
from .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloaderfrom .request import RequestFIFO_QUEUE get_redis_queue_cls(fifo)class Slave(object):def __init__(self, spiders, project_name, request_manager_config):self.filter_queue FIFO_QUEUE(filter_queue, hostlocalhost)self.request_manager RequestManager(**request_manager_config) self.downloader RequestsDownloader() # 用 requests 同步请求的下载器self.spiders spidersself.project_name project_namedef handle_request(self):# 1. 获取一个请求request self.request_manager.get_request(self.project_name)# 2. 发起请求response self.downloader.fetch(request) # 每次都同步去请求 # 3. 获取爬虫对象spider self.spiders[request.name]()# 4. 处理 responsefor result in spider.parse(response):if result is None:raise Exception(不允许返回None)elif isinstance(result, Request):self.filter_queue.put(result)else:# 意味着是一个数据new_result spider.data_clean(result)spider.data_save(new_result)def run(self):while True:self.handle_request()
异步请求改造
通过 tornado 的异步请求
下载器(异步)
from tornado.httpclient import HTTPClient, HTTPRequest, AsyncHTTPClientfrom spiderSystem.response import Response# tornado 也有同步请求方式 可以忽略
class TornadoDownloader(object):def __init__(self):self.httpclient HTTPClient()def fetch(self, request):print(tornado 同步客户端发的请求)tornado_request HTTPRequest(request.with_query_url, methodrequest.method.upper(), headersrequest.headers)tornado_response self.httpclient.fetch(tornado_request)return Response(requestrequest, status_codetornado_response.code, urltornado_response.effective_url,bodytornado_response.buffer.read())同步的请求不能复用需要用完后关闭def __del__(self):self.httpclient.close()# tornado 也有异步请求方式
class AsyncTornadoDownloader(object):def __init__(self):self.async_http_client AsyncHTTPClient()async def fetch(self, request): # 开启协程print(tornado 异步客户端发的请求)tornado_request HTTPRequest(request.with_query_url, methodrequest.method.upper(), headersrequest.headers)tornado_response await self.async_http_client.fetch(tornado_request) # 等待return Response(requestrequest, status_codetornado_response.code, urltornado_response.effective_url,headersrequest.headers,bodytornado_response.buffer.read())
Slave 调用方
import asyncio
import tornado.ioloopfrom .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloader, TornadoDownloader, AsyncTornadoDownloaderfrom .request import RequestFIFO_QUEUE get_redis_queue_cls(fifo)class Slave(object):def __init__(self, spiders, project_name, request_manager_config):self.filter_queue FIFO_QUEUE(filter_queue, hostlocalhost)self.request_manager RequestManager(**request_manager_config) self.downloader AsyncTornadoDownloader() # 异步下载器self.spiders spidersself.project_name project_nameasync def handle_request(self):# request self.request_manager.get_request(self.project_name) 阻塞改异步io_loop tornado.ioloop.IOLoop.current()# 1. 获取一个请求future io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) # 不支持协程的函数可以自己获取事件循环去定义执行让其支持协程request await future# 2. 发起请求response await self.downloader.fetch(request)# 3. 获取爬虫对象spider self.spiders[request.name]()# 4. 处理 responsefor result in spider.parse(response):if result is None:raise Exception(不允许返回None)elif isinstance(result, Request):# self.filter_queue.put(result) 可能阻塞改异步await io_loop.run_in_executor(None, self.filter_queue.put,result) else:# 意味着是一个数据new_result spider.data_clean(result)spider.data_save(new_result)async def run(self):while True:# 不能写成 await self.handle_request()否则也是相当于同步请求了await asyncio.wait([self.handle_request(),self.handle_request(),])
启动方式
if __name__ __main__:spiders {BaiduSpider.name: BaiduSpider}# 同步请求用 requests 发请求# Slave(spiders, project_namePROJECT_NAME, request_manager_configREQUEST_MANAGER_CONFIG).run()# 要用异步方式去请求slave Slave(spiders, project_namePROJECT_NAME, request_manager_configREQUEST_MANAGER_CONFIG)io_loop tornado.ioloop.IOLoop.current()io_loop.run_sync(slave.run) tornado库 io_loop.run_sync 用于将阻塞函数转换为同步函数并在 IOLoop 上执行它会阻塞当前协程。 io_loop.run_in_executor 用于在指定的线程池中异步执行耗时的、阻塞的操作不会阻塞当前协程并允许 IOLoop 继续处理其他事件。 asyncio库 实现类似于 run_sync 的效果您可以使用 loop.run_until_complete 方法来运行一个协程并等待其完成。这个方法会阻塞当前线程直到协程执行完毕 实现类似于 run_in_executor 的效果您可以使用 loop.run_in_executor 方法将耗时的、阻塞的操作转移到一个线程池中执行以避免阻塞事件循环。 async 异步协程改造重点!!! 下载器中用到的所有异步的地方必须是协程 async 定义await 后面跟着的一定是支持协程的方法要不是一个 协程对象future 或者 task 对象比如 self.async_http_client.fetch 如果不支持协程会报错连带着的所有调用 async 的方法也必须是协程函数对于不支持协程的函数可以自己获取事件循环去定义执行让其支持协程如果一个函数是一个协程函数后如果这个协程函数中有任意可以阻塞的或耗时操作都应该改成异步的 await ,不然可能会阻塞整个线程 # self.request_manager.get_request 本身不支持异步或者改造成异步嵌套要改的太深可以用 io_loop.run_in_executor 来替代io_loop tornado.ioloop.IOLoop.current()# 1. 获取一个请求future io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) request await future 在最开始调用的地方比如 run 启动的方式必须是 用 asyncio.wait 或用其他方式启动asyncio.gather 或 asyncio.as_completed # 开启2个协程去执行。asyncio.wait 能让其变成一个异步关系
async def run(self):while True:# 不能写成 await self.handle_request()否则也是相当于同步请求了await asyncio.wait([self.handle_request(),self.handle_request(),])
Master 进程用多线程改造
master 的启动方法这两个可以用两个线程去做不然以前的写法是同步的执行方式 def run(self):# self.run_start_requests()# self.run_filter_queue()# 两个线程去做threading.Thread(targetself.run_start_requests).start()threading.Thread(targetself.run_filter_queue).start()
自己封装的SpiderSystem模块安装成内置环境中
在模块目录添加 setup.py 脚本
├── setup.py
├── spiderSystem
├── README.md
执行 pip3 setup.py install 即可查看包信息 pip3 show spiderSystem
from setuptools import setup, find_packagessetup(namespiderSystem,version0.1,descriptionspiderSystem module,authorraoju,urlurl,licenselicense,packagesfind_packages(exclude[]), # 当前所有模块都安装install_requires[tornado 5.1,pycurl,])
如果你对Python感兴趣想要学习python这里给大家分享一份Python全套学习资料都是我自己学习时整理的希望可以帮到你一起加油
有需要的小伙伴可以点击下方链接免费领取或者V扫描下方二维码免费领取 Python全套学习资料 1️⃣零基础入门
① 学习路线
对于从来没有接触过Python的同学我们帮你准备了详细的学习成长路线图。可以说是最科学最系统的学习路线你可以按照上面的知识点去找对应的学习资源保证自己学得较为全面。
② 路线对应学习视频
还有很多适合0基础入门的学习视频有了这些视频轻轻松松上手Python~
③练习题
每节视频课后都有对应的练习题哦可以检验学习成果哈哈
2️⃣国内外Python书籍、文档
① 文档和书籍资料 3️⃣Python工具包项目源码合集
①Python工具包
学习Python常用的开发软件都在这里了每个都有详细的安装教程保证你可以安装成功哦
②Python实战案例
光学理论是没用的要学会跟着一起敲代码动手实操才能将自己的所学运用到实际当中去这时候可以搞点实战案例来学习。100实战案例源码等你来拿
③Python小游戏源码
如果觉得上面的实战案例有点枯燥可以试试自己用Python编写小游戏让你的学习过程中增添一点趣味
4️⃣Python面试题
我们学会了Python之后有了技能就可以出去找工作啦下面这些面试题是都来自阿里、腾讯、字节等一线互联网大厂并且有阿里大佬给出了权威的解答刷完这一套面试资料相信大家都能找到满意的工作。
上述所有资料 ⚡️ 朋友们如果有需要的可以扫描下方二维码免费领取