微信小程序和网站开发有什么区别,苏州淘宝网站建设培训,青岛推广优化,网站建设是指本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示#xff0c;同时#xff0c;统计目标数量以及违停数量#xff0c;生成结果评估#xff0c;一并发送到前端展示。对于本文任何技术上的空缺#xff0c;可在博主主页前面博客… 本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示同时统计目标数量以及违停数量生成结果评估一并发送到前端展示。对于本文任何技术上的空缺可在博主主页前面博客寻找有任何问题欢迎私信或评论区讨论 目录 涉及技术栈 基本效果 存在的问题亟需解决 代码及粗略解释 资源 涉及技术栈
Django5vue3websocketSRSFFmpegRTMPYOLOv8AI模型异步多进程flv.jsnode.js
基本效果 web端推拉流测试、抽帧识别计数一键式生成巡检报告 项目结构(Django) ├── DJI_yolo │ ├── __init__.py │ ├── __pycache__ │ ├── asgi.py │ ├── settings.py │ ├── templates │ ├── urls.py │ └── wsgi.py ├── app01 │ ├── __init__.py │ ├── __pycache__ │ ├── admin.py │ ├── apps.py │ ├── car_best.pt │ ├── consumers │ │ ├── __init__.py │ │ ├── __pycache__ │ │ ├── detection_consumer.py │ │ └── report_consumer.py │ ├── migrations │ │ ├── __init__.py │ │ └── __pycache__ │ ├── models.py │ ├── pictures │ ├── routings.py │ ├── services │ │ ├── __init__.py │ │ ├── __pycache__ │ │ └── detection_engine.py │ ├── stream_status.py │ ├── tests.py │ ├── utils.py │ └── views.py ├── manage.py ├── media │ ├── 2025-06-02 │ │ ├── 20时14分54秒-20时17分03秒 │ │ │ ├── 关键帧图片 │ │ │ ├── 原视频 │ │ │ └── 识别视频 │ │ ├── 20时26分11秒-20时29分00秒 │ │ │ ├── 关键帧图片 │ │ │ ├── 原视频 │ │ │ └── 识别视频 前端代码因保密不方便展示可自行根据后端端口续写后端Django完整代码在文章末尾请查收 存在的问题亟需解决
1、后端逐帧识别并返回前端前端显示卡顿考虑跳帧识别一秒识别一帧剩余原始帧直接返回。
2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪为每帧每个目标进行编号id提取帧中目标特征速度、目标面积、唯位移等等进行特征化工程从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法单帧识别时间过长单帧识别时间大于拉流帧传输时间导致进程堵塞程序崩溃。后续采取措施改进。
3、模型并未做违停检测考虑重新训练模型添加违停训练集重新训练顺便搭配 yolo12 环境用 yolo12 进行训练。
4、Django 后端接受到流并不能直接开始识别而是先用四五秒时间加载模型需要优化。
5、执行任务、完成任务到数据存储、前端显示巡检报告延迟较高需要优化。
6、为后端添加车辆运动状态算法若连续识别的两个帧同一目标静止并且识别为违停则判断为违停若非静止但识别违停不做处理。相对静止判断无人机在动无法建立参考系。
7、进行车牌识别。 代码及粗略解释
detection_consumer.py
此处代码使用 FFmpeg 拉取 SRS 上的流并使用 detection_engine 中的类进行帧识别和计数然后判断计数调用讯飞星火 API 生成 AI 答复获取关键帧将所有信息封装成一个字典并传给前端以用于违停报告生成。同时将实时识别帧通过 websocket 的消费者传给前端显示。
import asyncio
import base64
import os
import subprocess
import time
import traceback
from datetime import datetime
from pathlib import Path
from .report_consumer import ReportConsumer
import numpy as np
import cv2
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from app01.services.detection_engine import DetectionEngine, logger
from concurrent.futures import ThreadPoolExecutor
from app01.utils import rename_time_folder # 导入重命名函数
from app01.stream_status import stream_status, status_lock # 从新模块导入
import requestsRTMP_URL rtmp://127.0.0.1:1935/live/stream
channel_layer get_channel_layer()class DetectionStreamConsumer(AsyncWebsocketConsumer):async def connect(self):await self.channel_layer.group_add(detection_group, self.channel_name)await self.accept()print(WebSocket 连接建立成功)async def disconnect(self, code):await self.channel_layer.group_discard(detection_group, self.channel_name)print(WebSocket 连接关闭)async def start_detection(self):print(开始检测帧流...)await self.monitor_stream_status()async def monitor_stream_status(self):while True:if stream_status.is_streaming:logger.info(检测到推流开始处理...)await self.start_detection_stream()else:logger.info(当前无推流等待中...)await asyncio.sleep(1)async def start_detection_stream(self, all_dictNone):ffmpeg_pull_command [ffmpeg,-c:v, h264,-i, RTMP_URL,-f, rawvideo,-pix_fmt, bgr24,-s, 640x360,-r, 5,-vcodec, rawvideo,-an,-flags, low_delay,-tune, zerolatency,-]try:process subprocess.Popen(ffmpeg_pull_command,stdoutsubprocess.PIPE,stderrsubprocess.DEVNULL,bufsize10 * 8)except Exception as e:logger.error(fFFmpeg 子进程启动失败: {e})returnif process.stdout is None:logger.error(FFmpeg stdout 为空)returnframe_width, frame_height 640, 360frame_size frame_width * frame_height * 3executor ThreadPoolExecutor(max_workers4) # 最多可以同时运行 4 个线程engine DetectionEngine()frame_count 0skip_interval 1try:while stream_status.is_streaming:try:loop asyncio.get_event_loop()raw_frame await asyncio.wait_for(loop.run_in_executor(executor, process.stdout.read, frame_size),timeout8 # 超时检测断流)except asyncio.TimeoutError:logger.warning(读取帧超时触发流结束)with status_lock:stream_status.is_streaming Falsebreakif len(raw_frame) ! frame_size:continuetry:frame np.frombuffer(raw_frame, dtypenp.uint8).reshape((frame_height, frame_width, 3))except Exception as e:logger.warning(f帧解析失败: {e})continueframe_count 1if frame_count % skip_interval ! 0:continueresult await loop.run_in_executor(executor, engine.process_frame, frame)# 增加空值检查if result is None:logger.warning(检测结果为空跳过处理)continueprocessed_frame, detected_classes result# 更新车辆统计仅按类别with DetectionEngine.counter_lock:for class_id in detected_classes:if class_id 0:DetectionEngine.total_count[car] 1DetectionEngine.total_count[total] 1elif class_id 1:DetectionEngine.total_count[bus] 1DetectionEngine.total_count[total] 1elif class_id 2:DetectionEngine.total_count[truck] 1DetectionEngine.total_count[total] 1elif class_id 3:DetectionEngine.total_count[van] 1DetectionEngine.total_count[total] 1_, jpeg cv2.imencode(.jpg, processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])if channel_layer:await channel_layer.group_send(detection_group,{type: send_frame, frame: jpeg.tobytes()})except Exception as e:logger.error(f检测处理错误: {e})logger.error(traceback.format_exc())finally:logger.warning(流结束进入处理...)if process.returncode is None:process.kill()logger.warning(FFmpeg 进程已终止)# 延迟一小段时间确保文件操作完全释放time.sleep(1)stream_status.is_streaming Falsestream_status.end_datetime datetime.now() # 记录结束时间logger.warning(正在获取三个关键帧...)# 找3个关键帧必须在重命名前执行folder Path(f{stream_status.image_path})files [f for f in folder.iterdir() if f.is_file()]if not files:return None, None, None# 按修改时间排序files.sort(keylambda x: x.stat().st_mtime)first files[0]last files[-1]middle files[len(files) // 2]# 转换为Base64stream_status.image_li [self.image_to_base64(str(f)) # 传入字符串路径避免Path对象序列化问题for f in [first, middle, last]]logger.warning(正在重命名文件夹...)# 先复制 time_path因为后续可能被修改time_path stream_status.time_pathstart_datetime stream_status.start_datetimeend_datetime stream_status.end_datetimesave_path # 调用重命名函数使用完整 datetimeif time_path and start_datetime:path rename_time_folder(time_path,start_datetime, # 传入开始时间end_datetime # 传入结束时间)save_path pathlogger.warning(f文件夹已重命名为 {start_datetime.strftime(%H时%M分%S秒)}-{end_datetime.strftime(%H时%M分%S秒)})try:logger.warning(正在更新数据...)# 更新识别结果stats self.correct_overcounted_stats(DetectionEngine.total_count)print(stats:, stats)stream_status.total_stats statsprint(stream_status.total_stats, stream_status.total_stats)self.save_vehicle_stats(stats, save_path)except Exception as e:logger.error(f保存统计信息失败: {e})print(正在获取AI答复...)self.AI_response()# 定义总数据并序列化all_dict {datetime: stream_status.start_time.strftime(%Y-%m-%d %H:%M:%S.%f), # 序列化转为字符串image_li: [bs for bs in stream_status.image_li], # 关键帧转二进制detect_car: [_ for _ in stream_status.total_stats.values()],Al_response: stream_status.AI_talk,}print(all_dict:, all_dict)print(正在发送信息...)await ReportConsumer.send_report_data(all_dict)executor.shutdown(waitFalse)DetectionEngine.reset_stats()stream_status.reset_all_dict()def save_vehicle_stats(self, stats, save_pathNone):# 如果未提供路径使用默认的 time_pathif not save_path:save_path stream_status.time_pathif not save_path:logger.warning(无法保存统计信息路径为空)returnstats_path os.path.join(save_path, vehicle_stats.txt)try:with open(stats_path, w, encodingutf-8-sig) as f:f.write(车辆统计结果\n)f.write(-------------------------\n)for k, v in stats.items():f.write(f{k}: {v}\n)f.write(-------------------------\n)logger.info(f统计信息已保存至 {stats_path})except Exception as e:logger.error(f写入统计信息失败: {e})def AI_response(self):获取AI回复url https://spark-api-open.xf-yun.com/v1/chat/completionsheaders {Authorization: Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv,Content-Type: application/json,}content (f我现在要做一个无人机城市交通智慧巡检报告现在巡检结果是巡检时间{stream_status.start_time}f小汽车有{stream_status.total_stats[car]}辆面包车有{stream_status.total_stats[van]}辆,f公交车有{stream_status.total_stats[bus]}辆卡车有{stream_status.total_stats[truck]}辆,f识别到的违停车辆共有{stream_status.total_stats[illegally_car]}辆。f帮我生成一段结论要求不要有废话也不要写具体那四个类别识别到的车辆数字数200字语言精炼严谨)question {role: user,content: content}data {max_tokens: 4096,top_k: 4,messages: [question],temperature: 0.5,model: 4.0Ultra,stream: False,}response requests.post(url, headersheaders, jsondata)response.raise_for_status()result response.json()# 提取模型输出的内容reply result.get(choices, [{}])[0].get(message, {}).get(content, ).strip()print(reply)stream_status.AI_talk reply# 读取图片文件并转换为 Base64staticmethod # 静态方法不应接收self参数def image_to_base64(image_path):with open(image_path, rb) as img_file:base64_str base64.b64encode(img_file.read()).decode(utf-8)return fdata:image/jpeg;base64,{base64_str}staticmethod # 去掉self第一个参数def correct_overcounted_stats(stats, avg_appearance6):简单修正严重重复统计的结果corrected {}for cls in [car, van, bus, truck]:if cls in stats:corrected[cls] max(0, int(round(stats[cls] / avg_appearance)))# 可选重新计算 totalcorrected[total] sum(corrected.values())corrected[illegally_car] int(corrected[total] / 100)print(corrected)return correctedasync def send_frame(self, event):frame_bytes event[frame]await self.send(bytes_dataframe_bytes)
stream_status.py
本文件用于 Django 项目全局状态管理相当于 vue3 中的 pinia is_streaming 是前端推本地视频、后端上传 SRS 后变为 True 后端因为 is_streaming True 而开始拉流当超过 6 秒未拉到流判断流结束is_streaming False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量只有with status_lock在 with 块内只有当前线程持有锁时才能运行代码其他线程必须等待相当于在此处把锁”锁“起来了。
import threading# 全局状态锁和状态对象
status_lock threading.Lock()
class StreamStatus:def __init__(self):self.is_streaming False # 是否正在推流/检测self.start_time None # 推流开始时间datetime对象self.end_time None # 推流结束时间datetime对象self.time_path None # 当前时间文件夹路径如 Media/2025-06-01/12点self.total_stats { # 车辆统计car: 0,van: 0,bus: 0,truck: 0,total: 0,illegally_car: 0,}self.last_frame_time None # 最后收到有效帧的时间用于断流检测self.image_path Noneself.image_li []self.AI_talk Noneself.all_dict {}propertydef date_folder(self):获取日期文件夹名称如 2025-06-01if self.start_time:return self.start_time.strftime(%Y-%m-%d)return Nonepropertydef start_hour(self):获取开始小时如 12if self.start_time:return self.start_time.hourreturn Nonedef reset(self):重置状态用于新任务开始self.is_streaming Falseself.start_time Noneself.end_time Noneself.time_path Noneself.total_stats {k: 0 for k in self.total_stats}self.last_frame_time Nonedef reset_all_dict(self):self.last_frame_time None # 最后收到有效帧的时间用于断流检测self.image_path Noneself.image_li []self.AI_talk Noneself.all_dict {}self.all_dict {}
# 全局唯一的状态实例
stream_status StreamStatus()detection_engine.py
此处是 detection_consumer.py 使用的服务代码块用于为其提供帧识别服务consumer 拉到的每一个流都会调用该类中的 process_frame 类方法并同时进行计数和关键帧保存。
import threading
import time
import cv2
from ultralytics import YOLO
import logging
import os
from app01.stream_status import stream_status, status_lock # 从新模块导入logger logging.getLogger(__name__)MODEL_PATH F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt
model YOLO(MODEL_PATH)# 关键帧存储位置
pictures_dir pictures
if not os.path.exists(pictures_dir):os.makedirs(pictures_dir)class DetectionEngine:frame_counter 1counter_lock threading.Lock() # 线程锁保证计数器安全# 新增全局计数器和已统计 IDtotal_count {car: 0,van: 0,bus: 0,truck: 0,total: 0,illegally_car: 0}# 下次任务重置classmethoddef reset_stats(cls):with cls.counter_lock:for k in cls.total_count:cls.total_count[k] 0# 下次任务重置classmethoddef update_stats(cls, vehicle_type):with cls.counter_lock:if vehicle_type in cls.total_count:cls.total_count[vehicle_type] 1cls.total_count[total] 1def __init__(self):self.model model# 调整检测参数减少计算量self.detect_params {conf: 0.3, # 提高置信度阈值减少无效检测iou: 0.5, # 调整 NMS 阈值imgsz: [384, 640], # 输入尺寸降低分辨率classes: [0, 1, 2, 3], # 仅检测车辆类别2: car, 5: bus, 7: truckdevice: 0, # 使用 GPUhalf: True, # 使用 FP16 精度show: False # 关闭实时显示减少开销}def process_frame(self, frame):处理单帧图像YOLO推理 保存结果到关键帧图片文件夹results model(frame)detected_objects []annotated_frame frame.copy() # 复制原始帧以防止修改原图# 防御性检查确保 results 非空且包含有效数据if not results or len(results) 0:logger.warning(未获取到检测结果跳过处理)return frame, []boxes results[0].boxesif boxes is None or len(boxes) 0:logger.warning(检测结果为空跳过处理)return frame, []try:# 假设类别信息存储在 cls 属性中classes boxes.cls.int().cpu().tolist() if hasattr(boxes, cls) and boxes.cls is not None else []for class_id in classes:detected_objects.append(class_id)annotated_frame results[0].plot() # 绘制检测框except Exception as e:logger.error(f解析检测数据失败: {e})return annotated_frame, []with status_lock:car_count sum(1 for cls in detected_objects if cls 0)if car_count 18:try:timestamp int(time.time())save_path os.path.join(stream_status.image_path, fkeyframe_{timestamp}.jpg)cv2.imwrite(save_path, annotated_frame)logger.info(f关键帧保存成功: {save_path})except Exception as e:logger.error(f保存关键帧失败: {e})return annotated_frame, detected_objects
repoet_consumer.py
用于将前面 detection_consumer.py 中的字典数据发送给前端首先会将数据缓存到后端等到前端消费者建立成功立马发送出去。
import asynciofrom channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
import json# 缓存队列用于保存尚未发送的数据
queue []
# 记录是否有消费者连接
has_consumer Falseclass ReportConsumer(AsyncWebsocketConsumer):async def connect(self):global has_consumer# 加入指定组await self.channel_layer.group_add(report_group, self.channel_name)await self.accept()print(巡检报告WebSocket 已连接)# 设置有消费者连接的标志has_consumer True# 尝试发送缓存数据await flush_queue(self.channel_layer)async def disconnect(self, close_code):global has_consumer# 移除组成员await self.channel_layer.group_discard(report_group, self.channel_name)print(巡检报告WebSocket 断开连接)# 重置消费者连接标志has_consumer Falseasync def receive(self, text_dataNone, bytes_dataNone):passasync def send_report(self, event):处理 send_report 类型的消息并发送给客户端data event[data]print(【发送到客户端】, data)# 转换为JSON字符串发送try:# 处理可能包含的非JSON可序列化对象# 这里假设 image_li 包含 Path 对象需要转换为字符串if image_li in data:data[image_li] [str(path) for path in data[image_li]]await self.send(text_datajson.dumps(data))except Exception as e:print(f【发送到客户端失败】{e})classmethodasync def send_report_data(cls, data):供其他模块调用的方法global queue, has_consumer# 不再检查是否有人连接直接发送或缓存,生产模式用Redischannel_layer get_channel_layer()# 如果没有消费者连接将数据加入队列if not has_consumer:queue.append(data)print(f【数据已缓存】等待消费者连接: {data})else:# 有消费者连接直接发送await send_report(channel_layer, data)# 全局函数
# 定期检查并发送缓存数据的任务
async def queue_check_task():channel_layer get_channel_layer()while True:await asyncio.sleep(1) # 每秒检查一次if queue and has_consumer:await flush_queue(channel_layer)
# 在应用启动时启动队列检查任务
async def start_queue_check():asyncio.create_task(queue_check_task())
async def send_report(channel_layer, data):try:await channel_layer.group_send(report_group,{type: send_report,data: data,},)print(【发送成功】, data)except Exception as e:print(f【发送失败】{e})async def flush_queue(channel_layer):global queueif not queue:returnitems list(queue)queue.clear()for data in items:await send_report(channel_layer, data)routings.py
用于定义 websocket 的后端接收路径并连接消费者
from django.urls import re_path
from app01.consumers import detection_consumer, report_consumerwebsocket_urlpatterns [re_path(r^ws/detection/$, detection_consumer.DetectionStreamConsumer.as_asgi()),re_path(r^ws/report/$, report_consumer.ReportConsumer.as_asgi()),
]
urls.py
用于接收前端发送的 http 请求并与 view.py 中的视图函数建立联系 URL configuration for DJI_yolo project.The urlpatterns list routes URLs to views. For more information please see:https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views1. Add an import: from my_app import views2. Add a URL to urlpatterns: path(, views.home, namehome)
Class-based views1. Add an import: from other_app.views import Home2. Add a URL to urlpatterns: path(, Home.as_view(), namehome)
Including another URLconf1. Import the include() function: from django.urls import include, path2. Add a URL to urlpatqterns: path(blog/, include(blog.urls))from django.conf.urls.static import static
from django.contrib import admin
from django.urls import pathfrom DJI_yolo import settings
from app01 import views
urlpatterns [path(admin/, admin.site.urls),path(csrf/, views.get_csrf_token, nameget_csrf_token),# 推流测试path(push_stream/, views.push_stream, namepush_stream),# 媒体库管理path(api4/date-folders/, views.get_date_folders, namedate-folders),path(api4/time-folders/str:date/, views.get_time_folders, nametime-folders),path(api4/files/str:date/str:time_range/str:category/count/, views.get_file_count, namefile-count),path(api4/files/str:date/str:time_range/str:category/, views.get_files, namefiles),path(api4/delete-file/str:date/str:time_range/str:category/str:filename/, views.delete_file,namedelete-file),path(delete-file/str:date/str:time_range/str:category/str:filename, views.delete_file,namedelete-file), # 添加一个用于删除文件的 URL 路径] static(settings.MEDIA_URL, document_rootsettings.MEDIA_ROOT)views.py
用于处理前端请求这里报告推流、媒体管理系统、原始视频存储逻辑等等。
# views.py
from datetime import datetime
from pathlib import Path
import urllib.parse
from django.middleware.csrf import get_token
import os
import subprocess
import threading
from django.http import JsonResponse
from django.conf import settings
import asyncio
from .consumers.detection_consumer import DetectionStreamConsumer
from .utils import create_date_folder, create_time_folder
from .stream_status import stream_statusMEDIA_ROOT Path(settings.MEDIA_ROOT)def get_csrf_token(request):token get_token(request)print(token)return JsonResponse({csrfToken: token})def push_stream(request):print(前端返回本地视频准备推流)video_file request.FILES.get(video)if not video_file:return JsonResponse({error: 未上传视频文件}, status400)# ------------------- 创建精确时间文件夹 -------------------start_datetime datetime.now() # 记录精确到秒的开始时间date_str start_datetime.strftime(%Y-%m-%d)date_path create_date_folder(date_str)stream_status.start_time datetime.now()# 创建带时分秒的文件夹如 15:30:45time_path, time_folder create_time_folder(date_path, start_datetime)# 创建三类子文件夹original_path os.path.join(time_path, 原视频)recognized_path os.path.join(time_path, 识别视频)image_path os.path.join(time_path, 关键帧图片)stream_status.image_path image_path # 保存到全局状态for folder in [original_path, recognized_path, image_path]:os.makedirs(folder, exist_okTrue)# ------------------- 保存上传视频 -------------------original_video_name foriginal_{start_datetime.strftime(%H%M%S)}.mp4original_video_path os.path.join(original_path, original_video_name)with open(original_video_path, wb) as f:for chunk in video_file.chunks():f.write(chunk)# ------------------- 推流配置 -------------------push_url rtmp://127.0.0.1:1935/live/streamrecognized_video_name frecognized_{start_datetime.strftime(%H%M%S)}.mp4recognized_video_path os.path.join(recognized_path, recognized_video_name)command [ffmpeg,-re,-i, original_video_path,-c:v, libx264,-preset, ultrafast,-tune, zerolatency,-g, 50,-r, 30,-an,-f, flv,push_url,-f, mp4,recognized_video_path]try:process subprocess.Popen(command, stdoutsubprocess.PIPE, stderrsubprocess.PIPE)# 保存完整时间状态stream_status.is_streaming Truestream_status.start_datetime start_datetime # 保存完整 datetimestream_status.time_path time_pathstream_status.recognized_path recognized_pathstream_status.image_path image_pathdetection_thread threading.Thread(targetrun_detection_in_background,daemonTrue)detection_thread.start()return JsonResponse({status: success,date_folder: date_str,time_folder: time_folder # 格式如 15:30:45})except Exception as e:return JsonResponse({error: str(e)}, status500)def run_detection_in_background():try:loop asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(DetectionStreamConsumer().start_detection())except Exception as e:print(f后台检测线程异常: {e})def get_date_folders(request):page int(request.GET.get(page, 1))page_size int(request.GET.get(page_size, 10))MEDIA_ROOT_PATH Path(rF:\FullStack\Django\DJI_yolo\media)date_folders []for name in os.listdir(MEDIA_ROOT_PATH):folder_path MEDIA_ROOT_PATH / nameif folder_path.is_dir() and len(name.split(-)) 3:date_folders.append({date: name})date_folders.sort(keylambda x: x[date], reverseTrue)start (page - 1) * page_sizeend start page_sizereturn JsonResponse({data: date_folders[start:end],total: len(date_folders)})def get_time_folders(request, date):date_path Path(rF:\FullStack\Django\DJI_yolo\media) / dateif not date_path.is_dir():return JsonResponse({error: 日期文件夹不存在}, status404)page int(request.GET.get(page, 1))page_size int(request.GET.get(page_size, 10))time_folders []for name in os.listdir(date_path):time_path date_path / nameif time_path.is_dir():original_count len(os.listdir(time_path / 原视频)) if (time_path / 原视频).is_dir() else 0recognized_count len(os.listdir(time_path / 识别视频)) if (time_path / 识别视频).is_dir() else 0image_count len(os.listdir(time_path / 关键帧图片)) if (time_path / 关键帧图片).is_dir() else 0time_folders.append({time_range: name, # 直接使用时间区间名称original_count: original_count,recognized_count: recognized_count,image_count: image_count})time_folders.sort(keylambda x: x[time_range], reverseTrue)start (page - 1) * page_sizeend start page_sizereturn JsonResponse({data: time_folders[start:end],total: len(time_folders)})def get_file_count(request, date, time_range, category):category_path Path(rF:\FullStack\Django\DJI_yolo\media) / date / time_range / categoryif not category_path.is_dir():return JsonResponse({count: 0})count len(os.listdir(category_path))return JsonResponse({count: count})def get_files(request, date, time_range, category):try:decoded_time_range urllib.parse.unquote(time_range)category_path Path(rF:\FullStack\Django\DJI_yolo\media) / date / decoded_time_range / categoryif not category_path.is_dir():return JsonResponse({error: 文件类别不存在}, status404)page int(request.GET.get(page, 1))page_size int(request.GET.get(page_size, 10))files []for filename in os.listdir(category_path):file_path category_path / filenameif file_path.is_file():encoded_time_range urllib.parse.quote(decoded_time_range)files.append({name: filename,url: fhttp://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}})start (page - 1) * page_sizeend start page_sizereturn JsonResponse({data: files[start:end],total: len(files),status: success})except Exception as e:print(f文件列表错误: {str(e)})return JsonResponse({error: 服务器内部错误,detail: str(e)}, status500)def delete_file(request, date, time_range, category, filename):try:decoded_time_range urllib.parse.unquote(time_range)file_path Path(rF:\FullStack\Django\DJI_yolo\media) / date / decoded_time_range / category / filenameif not file_path.exists():return JsonResponse({error: 文件不存在}, status404)os.remove(file_path)return JsonResponse({status: success})except Exception as e:return JsonResponse({error: str(e)}, status500)
感谢您的观看
资源如下
https://download.csdn.net/download/2403_83182682/90961392?spm1001.2014.3001.5501https://download.csdn.net/download/2403_83182682/90961392?spm1001.2014.3001.5501