网站备案 信息查询,长沙移动网站建设,高效网站推广费用,免费开源分类信息系统这里我想完成的是 制作消息#xff08;多个协程制造#xff09;——》推送到rabbitmq——》订阅消息队列——》消费消息#xff08;ws协程客户端【一次消费多条】/ws前端#xff09; 利用 WebSocket 协议让客户端和服务器端保持有状态的长链接#xff0c;保存链接上来的客…这里我想完成的是 制作消息多个协程制造——》推送到rabbitmq——》订阅消息队列——》消费消息ws协程客户端【一次消费多条】/ws前端 利用 WebSocket 协议让客户端和服务器端保持有状态的长链接保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。
一安装rabbitmq
pull镜像
docker search rabbitmqdocker pull rabbitmq构建rabbitmq容器并加入hyperf环境组网 这里我们构建容器的的就将他加入到了hyperf环境的网络lnmp_default使用docker inspect hyperf 查看
docker run -d --networklnmp_default --hostname my-rabbit --name rabbit -p 15672:15672 -p 5673:5672 rabbitmq-d 后台运行容器--name 指定容器名-p 指定服务运行的端口5672应用访问端口15672控制台Web端口号-v 映射目录或文件--network 网络名称--hostname 主机名RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据默认为主机名-e 指定环境变量RABBITMQ_DEFAULT_VHOST默认虚拟机名RABBITMQ_DEFAULT_USER默认的用户名RABBITMQ_DEFAULT_PASS默认用户名的密码进入容器并运行
docker exec -it rabbit /bin/bashrabbitmq-plugins enable rabbitmq_management这里须要修改配置文件不然有的地方会报错 docker rabbitmq Management API returned status code 500
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector false management_agent.disable_metrics_collector.conf用户名和密码 guest
二rabbitmq配置
生成配置文件 php bin/hyperf.php vendor:publish hyperf/amqp 并修改配置文件这里的AMQP_HOST 就是lnmp_default 网络组中 rabbitmq的ip地址可使用docker inspect rabbit 查看端口即为容器内部映射ip这里默认是5672
三测试代码
php bin/hyperf.php gen:amqp-producer DemoProducer生产者
?phpdeclare(strict_types1);namespace App\Amqp\Producer;use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
use function Hyperf\Coroutine\co;//routingKey是指定路由到哪里可以决定到哪个队列。队列通过路由键绑定到交换器
#[Producer(exchange: hyperf, routingKey: hyperf-queue.sj)]
//生产者方指定了exchange交换机和routing key但是不指定queue队列也不将queue队列绑定到exchange
//队列声明和绑定队列到exchange的工作由消费者方完成
//费者在消费消息时需要声明队列队列名字随便并将声明的队列通过routing key绑定到exchange这样才能接收到数据
class DemoProducer extends ProducerMessage
{public function __construct($id){$pdata[id]$id;$pdata[time]date(Y-m-d H:i:s,time());$pdata[name]array_rand([张三,李四,王虎,陆风,牛犇,冯晨,丁酉,郑和]);;$pdata[say]bin2hex(random_bytes(10));$pdata[score]array_rand([61,80,75,77,99,88,97,81]);$this-payload $pdata;}
}
消费者
?phpdeclare(strict_types1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;#[Consumer(exchange: hyperf, routingKey: hyperf-queue.sj, queue: hyperf, name: DemoConsumer, nums: 1)]
class DemoConsumer extends ConsumerMessage
{public function consumeMessage($data, AMQPMessage $message): Result{print_r($data);return Result::ACK;}
}
控制器 public function productMessage(){try {//这里使用协程创建100条消息$wg new \Hyperf\Coroutine\WaitGroup();$wg-add(100);for($i0;$i100;$i){//创建协程co(function () use ($wg){$messagenew DemoProducer(1);//获取一个生产者实例$producer ApplicationContext::getContainer()-get(Producer::class);//传递消息$producer-produce($message);$wg-done();});}$wg-wait();} catch (\Exception $exception) {throw new \Swoole\Exception($exception-getMessage());}return [info 100条消息创建成功,];}保存后重启服务 php hyperf.php start 定义的消费者会自动消费详见文档。 ps:这里有个小问题时间不对。这里实际上在composer hyperf的时候第一步就是可以设置时区Asia/Shanghai 都是我没设置现在可以偷个懒在入口文件添加 date_default_timezone_set(‘Asia/Shanghai’); 即可。或修改php.ini