做3D打印样品用什么外贸网站好,网站设计哪里公司好,wordpress链接自定义结构404,制作个人网站主页在现代 Web 开发中#xff0c;异步任务处理和用户通知是两个重要的功能。由于老旧测试平台【测试用例生成平台#xff0c;源码分享】进行智能化升级后#xff0c;未采用异步任务处理#xff0c;大模型推理时间较长#xff0c;导致任务阻塞#xff0c;无法处理其他任务异步任务处理和用户通知是两个重要的功能。由于老旧测试平台【测试用例生成平台源码分享】进行智能化升级后未采用异步任务处理大模型推理时间较长导致任务阻塞无法处理其他任务体验较差。本文将以 Django 框架为基础结合 Celery 和 Redis完成一个完整的异步任务系统并实现用户通知功能。我们还会对接阿里云百炼 DeepSeek-R1 模型通过大模型生成测试用例并提供数据的增删改查功能。以下是详细的实现步骤和完整代码示例。 一、需求分析与功能拆解
1.1 功能需求
本项目的主要需求如下
用户通过 API 输入任务名称和字段信息系统为任务生成唯一的 UUID并提示用户任务已创建。异步任务系统调用阿里云百炼 DeepSeek-R1 模型传递字段信息生成 JSON 格式的测试用例。将模型生成的测试用例存储到数据库的测试用例预览表中。通知用户任务完成支持通过 WebSocket 或邮件通知。提供一个页面对生成的测试用例数据进行增删改查操作。
1.2 技术选型
Django作为后端框架负责处理 API 请求、任务状态管理和数据库操作。Celery用于实现任务的异步处理。Redis作为 Celery 的消息队列。阿里云百炼 DeepSeek-R1调用大模型接口生成测试用例。Django Channels实现 WebSocket 通知功能。前端页面用于展示和管理测试用例数据。 二、项目创建与环境配置
2.1 安装依赖
在开始开发之前需要安装以下依赖工具和库
pip install django celery redis requests channels2.2 项目目录结构
项目的目录结构如下
your_project_name/
├── your_project_name/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ ├── asgi.py
│ ├── celery.py
├── your_app_name/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── tasks.py
│ ├── signals.py
│ ├── urls.py
├── manage.py三、系统实现
3.1 配置 Celery 和 Redis
3.1.1 在 settings.py 中配置 Celery
# Celery 配置
CELERY_BROKER_URL redis://localhost:6379/0
CELERY_ACCEPT_CONTENT [json]
CELERY_TASK_SERIALIZER json# Django Channels 配置
ASGI_APPLICATION your_project_name.asgi.application
CHANNEL_LAYERS {default: {BACKEND: channels_redis.core.RedisChannelLayer,CONFIG: {hosts: [(127.0.0.1, 6379)],},},
}3.1.2 创建 celery.py
在项目根目录下创建 celery.py 文件用于初始化 Celery
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 设置 Django 默认的 settings 模块
os.environ.setdefault(DJANGO_SETTINGS_MODULE, your_project_name.settings)app Celery(your_project_name)# 从 Django 的 settings.py 加载配置
app.config_from_object(django.conf:settings, namespaceCELERY)# 自动发现任务模块
app.autodiscover_tasks()3.1.3 配置 __init__.py
在项目目录下的 __init__.py 文件中添加以下内容
from __future__ import absolute_import, unicode_literals# Celery 应用
from .celery import app as celery_app__all__ (celery_app,)3.1.4 配置 asgi.py
在项目根目录下的 asgi.py 文件中配置 WebSocket 支持
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStackos.environ.setdefault(DJANGO_SETTINGS_MODULE, your_project_name.settings)application ProtocolTypeRouter({http: get_asgi_application(),websocket: AuthMiddlewareStack(URLRouter([# WebSocket 路由])),
})3.2 数据模型设计
在 models.py 中定义两张表任务表 和 测试用例预览表。
from django.db import models
import uuid# 测试用例预览表
class TestCasePreview(models.Model):id models.UUIDField(primary_keyTrue, defaultuuid.uuid4, editableFalse)field_name models.CharField(max_length255)field_type models.CharField(max_length255)field_value models.TextField()generated_test_case models.JSONField() # 存储生成的 JSON 测试用例created_at models.DateTimeField(auto_now_addTrue)# 异步任务表
class AsyncTask(models.Model):TASK_STATUS ((PENDING, Pending),(PROCESSING, Processing),(COMPLETED, Completed),(FAILED, Failed),)id models.UUIDField(primary_keyTrue, defaultuuid.uuid4, editableFalse)name models.CharField(max_length255, uniqueTrue)status models.CharField(max_length20, choicesTASK_STATUS, defaultPENDING)created_at models.DateTimeField(auto_now_addTrue)updated_at models.DateTimeField(auto_nowTrue)result_message models.TextField(nullTrue, blankTrue)3.3 异步任务实现
在 tasks.py 中实现异步任务逻辑
import requests
from celery import shared_task
from .models import AsyncTask, TestCasePreview
from channels.layers import get_channel_layer
from asgiref.sync import async_to_syncshared_task
def generate_test_cases(task_id, field_data):try:# 获取任务task AsyncTask.objects.get(idtask_id)task.status PROCESSINGtask.save()# 调用阿里云百炼 DeepSeek-R1 APIclient OpenAI(# 如果没有配置环境变量请用百炼API Key替换api_keysk-xxx# api_keysk-xxx,api_keysk-712a634dbaa7444d838d20b25eb938xx, # todo 此处需更换base_urlhttps://dashscope.aliyuncs.com/compatible-mode/v1)reasoning_content # 定义完整思考过程answer_content # 定义完整回复is_answering False # 判断是否结束思考过程并开始回复# 创建聊天完成请求completion client.chat.completions.create(modeldeepseek-r1, # 此处以 deepseek-r1 为例可按需更换模型名称messages[{role: user,content: prompt_param}],streamTrue,# 解除以下注释会在最后一个chunk返回Token使用量# stream_options{# include_usage: True# })print(\n * 20 思考过程 * 20 \n)for chunk in completion:# 如果chunk.choices为空则打印usageif not chunk.choices:print(\nUsage:)print(chunk.usage)else:delta chunk.choices[0].delta# 打印思考过程if hasattr(delta, reasoning_content) and delta.reasoning_content ! None:print(delta.reasoning_content, end, flushTrue)reasoning_content delta.reasoning_contentelse:# 开始回复if delta.content ! and not is_answering:print(\n * 20 完整回复 * 20 \n)is_answering True# 打印回复过程print(delta.content, end, flushTrue)answer_content delta.content# 存储测试用例到数据库for field in field_data:TestCasePreview.objects.create(field_namefield[field_name],field_typefield[field_type],field_valuefield[field_value],generated_test_caseextract_json_objects(answer_content))# 更新任务状态task.status COMPLETEDtask.result_message 测试用例生成成功task.save()# 通过 WebSocket 通知用户channel_layer get_channel_layer()async_to_sync(channel_layer.group_send)(ftask_{task_id},{type: task_status, message: 任务已完成})except Exception as e:task.status FAILEDtask.result_message str(e)task.save()3.4 用户接口
在 views.py 中提供创建任务的接口
from django.http import JsonResponse
from .models import AsyncTask
from .tasks import generate_test_casesdef create_task(request):if request.method POST:task_name request.POST.get(task_name)field_data request.POST.get(field_data) # JSON 格式的字段列表# 检查任务名称是否唯一if AsyncTask.objects.filter(nametask_name).exists():return JsonResponse({error: 任务名称已存在}, status400)# 创建任务task AsyncTask.objects.create(nametask_name)# 调用异步任务generate_test_cases.delay(task.id, field_data)return JsonResponse({message: 任务已创建, task_id: task.id})3.5 WebSocket 通知
WebSocket 消费者 consumers.py
from channels.generic.websocket import AsyncWebsocketConsumer
import jsonclass TaskConsumer(AsyncWebsocketConsumer):async def connect(self):self.task_id self.scope[url_route][kwargs][task_id]self.task_group_name ftask_{self.task_id}await self.channel_layer.group_add(self.task_group_name,self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.task_group_name,self.channel_name)async def task_status(self, event):message event[message]await self.send(text_datajson.dumps({message: message}))3.6 前端页面展示
创建一个简单的页面用于展示测试用例
!DOCTYPE html
html langen
headtitle测试用例预览/title
/head
bodyh1测试用例预览/h1table idtest-casestheadtrth字段名称/thth字段类型/thth字段值/thth测试用例/th/tr/theadtbody!-- 数据通过 Ajax 加载 --/tbody/table
/body
/html四、总结
本项目从需求分析出发采用 Django 框架结合 Celery、Redis 和 Django Channels完整实现了一个基于异步任务的测试用例生成系统。系统支持任务状态管理、用户通知以及前端数据的增删改查功能逻辑清晰功能完善非常适合在实际项目中推广应用。在老旧测试平台智能化中异步是不可或缺的整改步骤调试好异步任务后后续优化将进入快速迭代阶段。加油吧~