当前位置: 首页 > news >正文

大连网站制作流程网页基础知识

大连网站制作流程,网页基础知识,网站上咱们做鱼饵,遂昌赶街网站一、基本环境 python版本#xff1a;3.8.5 APScheduler3.10.4 Django3.2.7 djangorestframework3.15.1 SQLAlchemy2.0.29 PyMySQL1.1.0二、django基本设置 2.1、新增一个app 该app用来写apscheduler相关的代码 python manage.py startapp gs_scheduler 2.2、修改配置文件s…一、基本环境 python版本3.8.5 APScheduler3.10.4 Django3.2.7 djangorestframework3.15.1 SQLAlchemy2.0.29 PyMySQL1.1.0二、django基本设置 2.1、新增一个app 该app用来写apscheduler相关的代码 python manage.py startapp gs_scheduler 2.2、修改配置文件settings.py #使用pymysql做客户端 import pymysql pymysql.install_as_MySQLdb()INSTALLED_APPS [rest_framework, #注册restful 应用gs_scheduler, #注册新增的app ]#配置mysql MYSQL_HOST 127.0.0.1 MYSQL_PORT 3306 MYSQL_USER root MYSQL_PASSWORD admin-root MYSQL_NAME study_websocketDATABASES {default: {ENGINE: django.db.backends.mysql,HOST: MYSQL_HOST,PORT: MYSQL_PORT,USER: MYSQL_USER,PASSWORD: MYSQL_PASSWORD,NAME: MYSQL_NAME,} } 2.3、gs_scheduler创建urls.py 1、gs_scheduler/urls.py from django.urls import path from . import views urlpatterns [] 2、根路由urls.py from django.contrib import admin from django.urls import path,includeurlpatterns [path(admin/, admin.site.urls),path(api/scheduler/,include(gs_scheduler.urls)), ] 三、配置gs_scheduler应用 3.1、配置api接口 这些接口用来展示定时任务的运行情况 1、urls.py from django.urls import path from . import views urlpatterns [path(run_next/, views.JobNextRunTimeAPIView.as_view()),#定时任务下次运行path(run_history/,views.JobRunTimeHistory.as_view()),#定时任务运行历史path(run_error/,views.JobRunErrorHistory.as_view()), #定时任务最近运行错误 ]2、models.py from django.db import models# Create your models here. import sqlite3 from gs_scheduler.management.commands.config import MYSQL_HOST,MYSQL_NAME,MYSQL_PORT,MYSQL_PASSWORD,MYSQL_USER,MYSQL_CHARSET from datetime import datetime,timedelta from django.conf import settingsimport pymysql from concurrent.futures import ThreadPoolExecutor#时间戳转时间字符串 def timestamp_to_time_str(timestamp):# 使用 datetime 模块将时间戳转换为 datetime 对象dt datetime.fromtimestamp(timestamp)# 将 datetime 对象格式化为时间字符串time_str dt.strftime(%Y-%m-%d %H:%M:%S)return time_strclass MysqlDB:DATETIME_FORMAT %Y-%m-%d %H:%M:%S# 创建数据库连接池def __init__(self):self.conn pymysql.connect(hostMYSQL_HOST,portMYSQL_PORT,userMYSQL_USER,passwordMYSQL_PASSWORD,dbMYSQL_NAME,charsetMYSQL_CHARSET,cursorclasspymysql.cursors.DictCursor)# 执行数据库增删改查def _execute_sql(self,query):sql,args querytry:with self.conn.cursor() as cursor:# 执行sql语句if args:if isinstance(args,(tuple,list)):# args (value1,value2)cursor.execute(sql,args)else:# args value1cursor.execute(sql,(args,))else:cursor.execute(sql)# 不同类型sql设置不同返回值if sql.strip().lower()[:6] select:#查询语句rows cursor.fetchall() or []return rowselif sql.strip().lower()[:4] show:#查看表是否存在result cursor.fetchall()return resultelse:#增删改语句self.conn.commit()return Trueexcept Exception as e:print(sql执行失败,e)pass# 线程池执行sql语句def start_sql_with_pool(self, sql:str,argsNone):with ThreadPoolExecutor() as executor:result executor.submit(self._execute_sql, (sql,args)).result()return result#创建记录定时任务历史表(调度器使用)def create_apscheduler_history(self):#创建表语句表名不能是占位符sql CREATE TABLE apscheduler_history (id INTEGER AUTO_INCREMENT PRIMARY KEY,job_id VARCHAR(128),run_time DATETIME,is_error TINYINT,error_msg VARCHAR(256) NULL)#判断表存在不不存在再创建results self.start_sql_with_pool(SHOW TABLES)is_exist Falsefor dic in results:if dic[Tables_in_{}.format(MYSQL_NAME,)] apscheduler_history:is_exist True#表不存在才创建if not is_exist:# print(表不存在执行创建表)create self.start_sql_with_pool(sql)# 创建索引self.start_sql_with_pool(db.conn.cursor().execute(CREATE INDEX run_time_index ON apscheduler_history (run_time)))return True# 将任务运行的结果记录到数据库中调度器使用def insert_into_apscheduler_history(self, data_list: list):if len(data_list) 4: # 异常执行的任务sql INSERT INTO apscheduler_history (job_id,run_time,is_error,error_msg) VALUES (%s,%s,%s,%s)else: # 正常执行的任务sql INSERT INTO apscheduler_history (job_id,run_time,is_error) VALUES (%s,%s,%s)#使用线程池执行插入语句self.start_sql_with_pool(sql,data_list)# 删除8小时前的历史记录调度器使用def delete_8hour_before_history(self):before_8hour (datetime.now() - timedelta(hours8)).strftime(%Y-%m-%d %H:%M:%S)sql DELETE FROM apscheduler_history WHERE run_time %sself.start_sql_with_pool(sql,(before_8hour,))# 查询每个任务的下次运行时间api展示def fetch_all_next_run_time(self):sql SELECT id,next_run_time FROM apscheduler_jobs# 获取查询结果rows self.start_sql_with_pool(sql)for row in rows:row[next_run_time] timestamp_to_time_str(row[next_run_time])return rows# 查询所有任务最近10次运行记录api展示def fetch_all_run_history(self)::param query:SELECT id,next_run_time,job_state FROM apscheduler_jobs:return:#获取所有定时任务iddata_list self.fetch_all_next_run_time()id_list (dic.get(id) for dic in data_list)# 创建一个游标对象sql SELECT id,job_id,run_time FROM apscheduler_history WHERE is_error0 AND job_id %s ORDER BY run_time DESC LIMIT 10ret_list []for id in id_list:# 执行查询rows self.start_sql_with_pool(sql,(id,)) or []dic {id: id, last_run_time: [], msg: 最近10次运行时间}for row in rows:run_time row.get(run_time)run_time run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timedic[last_run_time].append(run_time)ret_list.append(dic)return ret_list# 获取任务最近运行失败情况(api展示)def fetch_all_error_history(self):data_list self.fetch_all_next_run_time()id_list (dic.get(id) for dic in data_list)sql SELECT run_time,error_msg FROM apscheduler_history WHERE is_error1 AND job_id %s ORDER BY id DESC LIMIT 5ret_list []for id in id_list:# 获取查询结果rows self.start_sql_with_pool(sql, (id,)) or []dic {id: id, last_run_time: [], msg: 最近5次执行失败}for row in rows:run_time row.get(run_time)run_time run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timeerror row.get(error_msg)dic[last_run_time].append({run_time: run_time, error: error})ret_list.append(dic)return ret_list# 关闭连接def close(self):self.conn.close()if __name__ __main__:db MysqlDB()db.create_apscheduler_history()# for i in range(1,10):# db.insert_into_apscheduler_history([send_to_big_data,2024-05-01 13:{}:12.format(str(i).zfill(2)),1,执行失败了])# db.delete_8hour_before_history()# ret db.fetch_all_run_history()# print(ret,history)# ret db.fetch_all_next_run_time()# print(ret,next)# ret db.fetch_all_error_history()# print(ret,error)db.close() 3、views.py from django.shortcuts import render# Create your views here. from rest_framework.views import APIView from rest_framework.response import Response from .models import MysqlDB#任务下次运行时间 class JobNextRunTimeAPIView(APIView):authentication_classes []def get(self,request):db MysqlDB()data db.fetch_all_next_run_time()db.close()ret {code:200,status:success,data:data,}return Response(ret)#任务最近运行历史 class JobRunTimeHistory(APIView):authentication_classes []def get(self,request):db MysqlDB()data db.fetch_all_run_history()db.close()ret {code:200,status:success,data:data}return Response(ret)#任务最近运行错误 class JobRunErrorHistory(APIView):authentication_classes []def get(self,request):db MysqlDB()data db.fetch_all_error_history()db.close()ret {code: 200,status: success,data: data}return Response(ret) 3.2、在gs_scheduler创建 1、config.py 代码 该文件存放的是启动APScheduler调度器的一些配置数据 import os from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储 #from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore #mysql等做后端存储 # from django.conf import settings from study_apscheduler import settings #mysql://root:ldc-root127.0.0.1:3306/jobs?charsetutf8 MYSQL_CONFIG settings.DATABASES.get(default) MYSQL_USER MYSQL_CONFIG.get(USER) MYSQL_PASSWORD MYSQL_CONFIG.get(PASSWORD) MYSQL_HOST MYSQL_CONFIG.get(HOST) MYSQL_PORT MYSQL_CONFIG.get(PORT) MYSQL_NAME MYSQL_CONFIG.get(NAME) MYSQL_CHARSET utf8mb4 URL mysql://{}:{}{}:{}/{}?charset{}.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET) #时区 TIME_ZONE Asia/Shanghai #job的默认配置 JOB_DEFAULTS {coalesce: True, #系统挂掉任务积攒多次为执行True是合并成一次执行False是执行所有的次数。 持久化存储才有效max_instances: 3 # 同一个任务同一时间最多只能有3个实例在运行。} #job的存储后端 JOB_STORE {default: SQLAlchemyJobStore(urlURL) }#监听事件对应的情况 LISTENER{1:调度程序启动,2:调度程序关闭,4:调度程序中任务处理暂停,64:将任务存储添加到调度程序中,8192:任务在执行期间引发异常,4096:任务执行成功, }2、task.py 所有的定时任务都存放在这里 import os from datetime import datetime, timedelta, date from gs_scheduler.models import MysqlDB from utils.log_util import info_log from utils.send_monitor_data import SendData#推送到数据仓的告警信息5分钟执行一次 send_to_big_data SendData().send#清除定时任务历史运行记录 def delete_apscheduler_history():db MysqlDB()db.delete_8hour_before_history()if __name__ __main__:print(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 3、crontab.py 实例化调度器 # 导入所需的调度器类和触发器类 from apscheduler.schedulers.background import BackgroundScheduler #后台运行 from apscheduler.schedulers.blocking import BlockingScheduler #主进程运行需要单独运行 from apscheduler.triggers.interval import IntervalTrigger #时间间隔 from apscheduler.triggers.cron import CronTrigger #复杂的定时任务 from apscheduler.triggers.date import DateTrigger #一次性定时任务 from django.core.management.base import BaseCommand from apscheduler import events from pytz import timezone from threading import RLock from datetime import datetime, timedelta from gs_scheduler.models import MysqlDB #定时任务 from .task import delete_apscheduler_history from .task import send_to_big_data #日志 from utils.log_util import info_log from .config import LISTENER from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE#脚本运行python manage.py crontab class Command(BaseCommand):TIME_FORMAT %Y-%m-%d %H:%M:%S#初始化调度器def _scheduler_obj(self):scheduler BlockingScheduler()scheduler.configure(timezone TIME_ZONE, #时区job_defaultsJOB_DEFAULTS, #job的默认配置jobstoresJOB_STORE, #job的存储后端)return scheduler#添加任务def _add_job(self,scheduler:BlockingScheduler):#每5分钟执行一次推送告警到大数据仓scheduler.add_job(send_to_big_data,triggerIntervalTrigger(minutes1),idsend_to_big_data,replace_existingTrue,coalesceTrue,)#每隔8个小时清除历史的记录scheduler.add_job(delete_apscheduler_history,triggerIntervalTrigger(hours8),iddelete_apscheduler_history,replace_existingTrue,coalesceTrue,)#添加监听器def _listener(self,event:events):code event.coderun_time datetime.now().strftime(self.TIME_FORMAT)msg LISTENER.get(code)db MysqlDB()if msg:if code 4096:#成功运行job_id event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,0])elif code 8192:#运行异常了job_id event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,1,msg])else:info_log(msg)db.close()def start(self):scheduler self._scheduler_obj()# 创建记录运行记录db MysqlDB()db.create_apscheduler_history()db.close()# 设置监听器scheduler.add_listener(self._listener)# 设置定时任务self._add_job(scheduler)try:# print({},定时器启动成功,等待定时任务执行....format(datetime.now().strftime(self.TIME_FORMAT)))scheduler.start()except KeyboardInterrupt:scheduler.shutdown()# python manage.py crontab运行 就是调用该方法def handle(self, *args, **options):self.start()#伴随django在后台运行的。 在wsgi.py 文件中调用BackRunScheduler().start() class BackRunScheduler(Command):# 初始化调度器def _scheduler_obj(self):scheduler BackgroundScheduler()scheduler.configure(timezoneTIME_ZONE, # 时区job_defaultsJOB_DEFAULTS, # job的默认配置jobstoresJOB_STORE, # job的存储后端)return scheduler 3.3、启动方式 方式一 python manage.py  crontab    生产环境推荐使用此方式单独运行 方式二在settings.py中调用BackRunScheduler().start()  , 后台运行 四、测试 4.1、启动 启动django项目python manage.py runserver 8080 启动定时器python manage.py crontab 4.2、等待一段时间 1、查询任务下次执行时间 请求http://127.0.0.1:8080/api/scheduler/run_next/ 2、查询任务最近运行情况 请求http://127.0.0.1:8080/api/scheduler/run_next/ 3、查询任务最近异常情况 请求http://127.0.0.1:8080/api/scheduler/run_error/ 五、源代码下载 码云地址 django应用定时器: django下使用定时器的方法https://gitee.com/liuhaizhang/django-application-timer/ 目前有两套代码一个以mysql存储任务一个用sqlite存储任务
http://www.zqtcl.cn/news/159269/

相关文章:

  • 东莞企业如何建网站网站正在建设中...为什么护卫神
  • 引流用的电影网站怎么做wordpress浏览速度
  • 微信小程序怎拼做搬家网站东莞建网站公司
  • 网站推广昔年下拉博客推广链接制作软件
  • php 小企业网站 cmswordpress导航分类
  • 婚恋网站女孩子都是做美容免费空间最大的网盘
  • 建立网站要钱吗找人做网站需求怎么写
  • 网站建设精品课程电商运营主要负责什么
  • 中职网站建设与维护考试题wordpress商店会员管理
  • 物流网站开发策划做提升自己的网站
  • 网站开发交接做网站首页尺寸大小
  • 临沂建网站公司一个工厂做网站有用吗
  • 网站建设代码编译的问题及解决方案天元建设集团有限公司第六分公司
  • 做亚马逊网站费用深圳好蜘蛛网站建设公司
  • 做网站需要办什么手续html简单网页代码实例
  • 中文网页设计模板免费下载超级优化小说
  • 做网站的流程前端做什么网站建设与管理专业学什么
  • 用wordpress做购物网站西安建设工程网站
  • 响应式网站免费模板下载电商怎么做如何从零开始视频
  • 江西网站开发学校联系我们网站制作
  • 做网站首页图片素材营销网站制作要素
  • 云阳网站建设百度对 wordpress 排名
  • 做电商网站需要多少时间网站建设答辩ppt
  • 营销型网站的案例江苏seo网站排名优化
  • 企业网站 备案 网站名称凡科做视频网站
  • 湘潭建设公司网站杭州网站优化
  • 工信部备案网站网站空间服务商
  • 深圳市企业网站seo营销工具桂林百姓网
  • 网站建设所需材料wordpress nginx配置文件
  • 给企业做网站运营广州制作网站公司