网站关键词分布,许昌做网站公司报价,wordpress 禁用 提示,溧阳建设局网站流缓冲区
流缓冲区一般用在不同设备或者不同进程间的通讯#xff0c;为了提高数据处理效率和性能#xff0c;设置的一定大小的缓冲区#xff0c;流缓冲区可以用来存储程序中需要处理的数据、对象、报文等信息#xff0c;使程序对可以对这些信息进行预处理、排序、过滤、拆…流缓冲区
流缓冲区一般用在不同设备或者不同进程间的通讯为了提高数据处理效率和性能设置的一定大小的缓冲区流缓冲区可以用来存储程序中需要处理的数据、对象、报文等信息使程序对可以对这些信息进行预处理、排序、过滤、拆分等操作从而提供程序的效率和处理速度。 流缓冲区主要应用于数据输入输出I/O操作中例如读取和写入文件、网络通信等场景。在数据读取方面缓冲区可以让程序一次读取较大量的数据而不是多次读取小量数据在提高读取效率的同时减少了系统调用的次数和内存访问的延迟。在数据写入方面通过缓冲区将一部分或全部数据暂存于缓冲中当缓冲区满或达到一定容量时再一次性写入磁盘或网络从而减少了磁盘或网络的访问次数提高了写入效率。 除了提高数据处理速度和性能外流缓冲区还可以用于降低程序对内存的使用。对于大型数据集或需要大量计算的场景使用流缓冲区可以有效地减少内存分配和释放的开销提高程序的稳定性和运行效率。 总的来说流缓冲区是程序中重要的优化手段之一它可以提高程序执行效率和性能降低内存使用同时也是许多应用和系统的核心部分。
环形缓冲区
其实在FreeRTOS中流缓冲区是使用环形缓冲区Ring buffer或者叫圆形队列Circular queue来实现的。 具体来说环形缓冲区由两个指针来控制读写操作头指针和尾指针。头指针指向缓冲区中当前的读取位置尾指针则指向缓冲区中当前的写入位置。当读取一个数据时头指针将向前移动一步当写入一个数据时尾指针将向前移动一步。当头指针和尾指针重叠时则说明缓冲区已经满了当头指针和尾指针完全相同时则说明缓冲区为空。 通过使用环形缓冲区FreeRTOS 可以高效地处理流数据并确保数据的读取和写入操作不会越界或冲突。此外环形缓冲区还可以避免在缓冲区操作过程中出现复制或移动操作从而提高数据传输的速度和性能。 总的来说FreeRTOS 中的流缓冲区使用环形缓冲区实现既可以提高数据传输效率又可以确保数据的安全性和稳定性。
流缓冲区和消息队列的不同点
在之前学到的消息队列中队列有个长度表示队列中可以存储多少个信息每个信息又有一个固定的长度。 这种设计在寸土寸金的单片机中是非常浪费的而且这种设计对非格式化数据的存储非常不友好。 比如我们从网络下载音乐然后在本地解码播放这个过程中每次下载多少个字节的数据是不可预知的网络快的时候每次下载的数据量会多一些可能一次1k字节当网络状态不好的时候每次下载会少一些比如10字节那对于每次如此悬殊的数据量再用消息队列的话就非常浪费不管我们给队列分配10字节的大小还是1k字节的大小似乎都无法满足我们的需求。 所以缓冲区的概念就诞生了 比如我们要用水烧饭人少的时候每次我们只用半桶水人多的时候一次可能要用3~5桶水如果每次烧饭的时候都去河边打水就显得很麻烦而且每次多打的水就会浪费掉。 于是我们做了一个可以装10桶的大水缸作为缓冲区等没事的时候我们就去河边挑水倒到水缸中每次做饭的时候就从水缸中取得一定量的水使用这样就解决了打水不即时和浪费的问题。 在程序中我们在内存中定义一块区域当这个水缸缓冲区下载器从网上下载MP3数据后有顺序的将数据放入到这块区域中当数据被存满的时候就暂停不再下载而MP3播放器每次取一定量的数据进行解码播放取出后对应的空间被空出来让下载器继续下载。
代码共享位置https://wokwi.com/projects/362958849739503617
#include freertos/stream_buffer.h
/********************************* 一些公共函数不用管 ***********************************/
#define BUZZER_PIN 4 // 蜂鸣器的引脚
#define BUZZER_CHANNEL 0 // PWM通道
/*** brief 音频解码算法* 对输入的字符串进行音频解码并通过PWM控制蜂鸣器播放* param[in] music 音频字符串格式为 音符,八度,间隔时间;下一个音符,* 怕扰民[in] size 长度*/
static void decode(char *notes, size_t size) {ledcAttachPin(BUZZER_PIN, BUZZER_CHANNEL); // 初始化PWM指定使用引脚和PWM通道size_t len size/3;for(int i0; ilen; i){int note notes[i*3];int octave notes[i*31];int rest notes[i*32]*5;ledcWriteNote(BUZZER_CHANNEL, (note_t)note, octave);vTaskDelay(rest);}ledcDetachPin(BUZZER_PIN);
}
/*** brief 随机生成音符* param[out] notes 音符序列* return 返回字符串长度*/
static size_t randomMusic(char **notes) {uint8_t len random(1, 21); // 生成1~20 组音符每组有3位char *p (char *)pvPortMalloc(sizeof(char)*len*3); // 动态分配内存// 随机生成数据第一个字节数据是0~12第二个是5~9第三个是20~200的5倍for(int i0; ilen; i){p[i*3] random(0,13);p[i*31] random(5,10);p[i*32] random(20,200);}*notes p;return len*3;
}
/*** brief 测试用打印音乐* param[in] notes 音乐数据序列* param[in] size 音乐长度3个一组打印**/
static void printMusic(char *notes, size_t size){// taskENTER_CRITICAL();printf([);for(int i0; isize; i){if(i % 3 0){printf( );}printf(%d,, notes[i]);}printf(]\n);// taskEXIT_CRITICAL();
}
/********************************* 留缓冲区例程 ***********************************/
#define BUFFER_SIZE 600 // 留缓冲区大小
#define TRIGGER_LEVEL 3 // 最小读出单位
#define READ_SIZE 90 // 最大读出宽度
StreamBufferHandle_t xStreamMusic NULL; // 声明一个流缓冲区指针
uint16_t read_counter0, write_counter0; // 读出和写入次数计数器纯计数没卵用
// 模拟下载线程
void download_task(void* param_t){printf([DOWN] 下载器启动...\n);char *music NULL; // 下载待写入的数据size_t music_size; // 随机生成的音乐大小size_t size; // 存放实际写入数据的大小while(1){// 模拟从网上下载随机长度的音乐music_size randomMusic(music);printf([DOWN] 随机生成音乐长度 %d \n, music_size);// printMusic(music, music_size);// 并写入到流缓冲区中并返回实际写入的数据长度size xStreamBufferSend(xStreamMusic, // 缓冲区句柄(void *)music, // 要写入的数据music_size, // 写入的数据大小portMAX_DELAY); // 当缓冲区满后的等待时间vPortFree(music);if(size ! music_size){// 如果实际写入的大小与音乐本身大小不一致说明缓冲区写入满了无法再继续写入printf([DOWN] 音乐写入失败缓冲区已满%d - %d\n,size, music_size);}else{printf([DOWN] 第 %d 次写入大小 %d\n, write_counter, size);}vTaskDelay(pdMS_TO_TICKS(random(100,500)));}
}
// 模拟播放线程
void paly_task(void* param_t){printf([PLAY] 播放器启动...\n);size_t size; // 存放实际读出数据的大小char music[READ_SIZE]; // 读出数据的存放位置while(1){printf([PLAY] 准备读取音乐\n);size xStreamBufferReceive(xStreamMusic, // 流缓冲区指针(void *)music, // 读出数据的存放变量指针READ_SIZE, // 预读出大小portMAX_DELAY); // 等待时间if(size0){// 解码播放音乐printf([PLAY] 第 %d 次读出大小 %d\n, read_counter, size);// printMusic(music, size);decode(music, size);printf([PLAY] 第 %d 段播放完毕\n,read_counter);}}
}
// 数据监控线程
void monitor_task(void* param_t){while(1){if (xStreamBufferIsFull(xStreamMusic) pdTRUE){printf([MONI] 缓冲区已满!\n);}else{printf([MONI] 已用 : %d , 剩余 : %d\n, xStreamBufferBytesAvailable(xStreamMusic),xStreamBufferSpacesAvailable(xStreamMusic));}vTaskDelay(1000);}
}
void setup() {Serial.begin(115200);Serial.println(Hello, ESP32-S3!);pinMode(BUZZER_PIN, OUTPUT);ledcSetup(0, 5000, 16); // 初始化PWMxStreamMusic xStreamBufferCreate(BUFFER_SIZE, TRIGGER_LEVEL);if ( xStreamMusic NULL ){printf(流缓冲区初始化失败请检查内存是否够用\n);}// // 创建线程xTaskCreate(download_task, Downloader, 10240, NULL, 1, NULL);xTaskCreate(monitor_task, Monitor, 10240, NULL, 1, NULL);xTaskCreate(paly_task, Player, 10240, NULL, 1, NULL);
}
void loop() {delay(10);
}例程代码中一共三个线程download_task 用于模拟数据下载paly_task 模拟播放器让蜂鸣器发声monitor_task 则用于数据监控。 代码中 decode 和 randomMusic 两个方法我们完全不用关心他是如何实现的与本例程无关。
在 setup 函数中首先通过 xStreamBufferCreate 方式创建了一个指定大小的数据缓冲区该函数包含有两个参数第一个表示该缓冲区的实际大小单位是字节第二个表示最小的读出大小也就是说缓冲区中必须包含 TRIGGER_LEVEL 个字节后才允许读取另外该函数还有个变形 xStreamBufferCreateWithCallback 改变形中包含四个参数前两个和 xStreamBufferCreate 参数一直后两个参数分别需要传入两个函数指针表示数据发送完和数据接收完的回调具体请文档。 当缓冲区创建成功后会返回这个缓冲区的句柄如果返回NULL则表示创建失败这时候需要检查内存是否够用。
download_task 任务模拟了从网上下载数据首先使用 randomMusic 函数随机创建了一些音符播放数据然后通过xStreamBufferSend 函数将数据发送到缓冲区中该函数原型如下
size_t xStreamBufferSend( StreamBufferHandle_t xStreamBuffer,const void *pvTxData,size_t xDataLengthBytes,TickType_t xTicksToWait );xStreamBuffer缓冲区句柄 pvTxData一个指向缓冲区的指针 该缓冲区用于保存要复制到流缓冲区的字节 xDataLengthBytes本次要发送的数据大小单位为字节 xTicksToWait超时等待时间当流缓冲区的空间太小 无法 容纳 另一个 xDataLengthBytes 的字节时任务应保持在阻塞状态以等待流缓冲区中出现足够空间的最长时间。 该函数会返回一个size_t的数据表示实际发送到数据缓冲区中的数据量如果发现返回值和xDataLengthBytes不一致时则有可能是pvTxData 指向数据所产生的问题因为如果缓冲区剩余空间不足的时候Send函数会等待还有一种可能就是数据发送等待超时引起。 有兴趣的同学可以修改https://wokwi.com/projects/362983694485624833 在该函数调用时如果 pvTxData 所指向的数据长度大于 xDataLengthBytes 则只会发送 xDataLengthBytes 个字节的数据之后的数据降不发送。 在编写此例程的时候最初用的是String类型数据后来测试发现问题频出还没来得及解决所以就改成了char型数据收发。 但随之产生一系列修改比如在动态分配空间时C语言中使用malloc分配使用free释放但是在FreeRTOS中存在内存管理单元所以需要特有的函数 pvPortMalloc 来分配内存空间使用 vPortFree 释放存储空间如果仍然使用malloc和freee进行操作一定几率上会出现问题取决于内存管理单元的配置项。 在 paly_task 中模拟了从数据缓冲区中读取数据并播放的流程我们预设每次最多读取 READ_SIZE 个字节的数据所以首先晟敏一个 READ_SIZE 大小的char数组作为通用接收器通过 xStreamBufferReceive 函数等待接收数据该函数原型如下
size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,void *pvRxData,size_t xBufferLengthBytes,TickType_t xTicksToWait );xStreamBuffer缓冲区句柄 pvTxData指向缓冲区的指针接收的字节将被复制到该缓冲区 xDataLengthBytes这会设置一次调用中 接收的最大字节数。 xStreamBufferReceive 将返回尽可能多的字节数 直到达到由 xBufferLengthBytes 设置的最大字节数为止。 xTicksToWait超时等待时间当流缓冲区的空间太小 无法 容纳 另一个 xDataLengthBytes 的字节时任务应保持在阻塞状态以等待流缓冲区中出现足够空间的最长时间。 该函数返回 size_t表示实际读出数据大小改大小有可能小于 xDataLengthBytes。 该函数如果在读取的时候缓冲区内不足 TRIGGER_LEVEL 个数据创建缓冲区时候设置的数据则会等待一旦大于或等于该数值是数据被读出。 在缓冲区内数据足够的情况下如果 pvRxData 长度大于 xBufferLengthBytes 则只会读出 xBufferLengthBytes 个字节的数据但如果 pvRxData 长度小于 xBufferLengthBytes 时读出有可能会因为数据溢出而报错所以在定义 pvRxData 大小是一定要考虑大于等于 xBufferLengthBytes。 当缓冲区内数据长度小于 xBufferLengthBytes 时只会读取剩余所有的数据此时返回值比 xBufferLengthBytes 小。 为了确保数据正常在读取前还可以调用 xMessageBufferNextLengthBytes 函数来查看缓冲区内下次可以读出多少个字节的数据。
monitor_task 是一个数据监控程序每间隔一段时间就输出缓冲区的已用空间大小和可用空间大小分别使用 xStreamBufferBytesAvailable 和 xStreamBufferSpacesAvailable 获取或使用 xStreamBufferIsFull 查看缓冲区是否已经满了但这个函数需要注意当使用Send函数向缓冲区发送数据时发送数据的大小大于缓冲区剩余空间大小时Send会等待并不会发送但其实此时缓冲区中扔剩余空间调用 xStreamBufferIsFull 返回也是false。
流缓冲区一般用在一对一的数据传输中很少使用多对一、一对多和多对多因为数据并非格式化的有前后顺序多对一容易造成写入数据混乱一对多容易造成读取数据混乱。
关于流缓冲区的所有API可以参考https://www.freertos.org/zh-cn-cmn-s/RTOS-stream-buffer-API.html
消息缓冲区
在 FreeRTOS 中消息缓冲区Message buffer是一种用于任务间通信的机制。它可以让任务之间传递各种类型的数据例如整数、浮点数、字符、结构体等等。消息缓冲区是由一块预定义大小的内存缓冲区和一些管理该缓冲区的数据结构组成。 在流缓冲区中每次放入消息的大小和读出的大小是可以不一致的因为要存储和传递的内容是非结构化的数据更像是水流所以在存入和读取的时候只需要关注顺序不需要关注大小。 在大多时候的数据通讯中我们都采用非固定长度的结构化数据进行传说也就是报文或者数据包的模式在这种数据传说中每次存入的数据大小是不同的 但读取是会根据存入的大小进行读取。 所以消息缓冲区在流缓冲区的基础上每次发送数据的时候在之前加入了4个字节的内容表示要传输数据的大小之后才是真正的消息。每次读取的时候总是先读取这四个字节当缓冲区中在这4个字节之后存在一个与四字节表达长度一致的消息时才会可以读取。
数据包
在互联网数据传输中我们一般收发指令都采用标准格式的数据包即数据传输协议每个软件都有自己的数据传输协议以本例程用到的数据规范定义协议如下
协议包含包头和包体两部分包头的长度固定8字节包体长度做大255个字节包头前三个字节表示一个正确包的开始依次是 0x55 0xAA 0x89包头第四个字节表示数据包的类型包头第五个字节为包体的大小根据ESP32数据规范数据包要求4字节对齐不足部分在最后使用char数组补齐
在数据传输过程中我们每一帧的数据都是以8字节的包头开始后面跟一组不定长的数据数据长度存放在包头第四个字节中。
场景
在智慧农业中经常会用到温度、湿度、光照、二氧化的浓度等等数据以此需求为例本次例程我们将完成一个集合温度、湿度、光照三项数据采集的设备其中温湿度采集设备位DHT22可同时采集温度和湿度两路数据LDR采集光照度。当传感器采集设备后通过数据包的方式发送到处理器中假设传感器和处理器都是网络中的两台设备本次例程中使用两个任务模拟网络上的两台设备两个传感器采集数据后封装标准的数据包并发送到消息缓冲区中DHT22传感器采集的是温度和湿度两种数据都是float类型的每个占4字节而LDR采集器最后传说的是一个float数据所以两个数据包的长度是不想同的。在读取阶段消息缓冲区会自动识别包长度进行读取。 本例程传输的数据结构只有两个比较简单理论上可以不加包头直接用长度确定数据类型但为了对数据报文结构明确演示我们将采用完整的方式编程。 注意在数据包的定义中尽可能的根据需要传输的协议简化包结构尽可能的少传输数据这样才能提升效率。
代码共享位置https://wokwi.com/projects/363029703756578817
#include freertos/message_buffer.h
#include DHTesp.h
#define DHT_PIN 15
#define LDR_PIN 5
#define BUFFER_SIZE 512 // 缓冲区大小
#define PH {0X55, 0XAA, 0X89} // 包头固定字符串
const char HEAD[3]PH;
typedef struct{ // 数据包结构体char head[3]; // 数据表其实标志包头包花uint8_t data_type; // 数据包类型uint8_t data_length; // 数据包包体长度char reserve[3]; // 无实际用途用于数据对齐
}Package_Head;
typedef struct{ // DHP22 数据结构体Package_Head head; // 包头float temperature; // 温度float humidity; // 湿度
}DHT22_Package;
typedef struct{ // LDR 数据结构体Package_Head head; // 包头float lux; // 光照度
}LDR_Package;
MessageBufferHandle_t xMessageBuffer NULL; // 消息缓冲区句柄
// 温湿度传感器数据获取线程
void dht22_task(void* param_t){DHTesp dhtSensor;dhtSensor.setup(DHT_PIN, DHTesp::DHT22);size_t psize sizeof(DHT22_Package); // 数据包大小Package_Head head {PH, 1, psize- sizeof(Package_Head)};DHT22_Package pck {head, 0, 0};size_t size;while(1){TempAndHumidity data dhtSensor.getTempAndHumidity();// 灌装数据pck.temperature data.temperature;pck.humidity data.humidity;size xMessageBufferSend(xMessageBuffer, // 消息缓冲句柄(void*)pck, // 传输的消息体首地址指针psize, // 本次传输的数据大小portMAX_DELAY); // 传输等待超时时间if(size ! psize){printf([DHTP] 数据发送失败可能造成数据不完整的混乱\n);}else{// printf([DHTP] 数据发送成功大小 : %d\n,size);}vTaskDelay(random(1000,3000));}
}
// 光照度传感器数据获取线程
void ldr_task(void* param_t){const float GAMMA 0.7;const float RL10 50;size_t psize sizeof(LDR_Package); // 数据包大小Package_Head head {PH, 2, psize- sizeof(Package_Head)};LDR_Package pck {head, 0};size_t size;while(1){int analogValue analogRead(LDR_PIN); // 读取引脚的模拟量// 一下是一顿猛如虎的操作具体做了些什么请参照LDR的文档8191是精度表示13位精度float voltage analogValue / 8191. * 5;float resistance 2000 * voltage / (1 - voltage / 5);float lux pow(RL10 * 1e3 * pow(10, GAMMA) / resistance, (1 / GAMMA));pck.lux lux;size xMessageBufferSend(xMessageBuffer, // 消息缓冲句柄(void*)pck, // 传输的消息体首地址指针psize, // 本次传输的数据大小portMAX_DELAY); // 传输等待超时时间if(size ! psize){printf([LDRP] 数据发送失败可能造成数据不完整的混乱\n);}else{// printf([DHTP] 数据发送成功大小 : %d\n,size);}vTaskDelay(random(1000,3000));}
}
// 消息处理器
void processor(char *data, size_t size){/* 正确的消息处理方式如下* 1. 首先判断包头是否正确如果不正确则直接扔掉数据包* 2. 判断长度是否正确如果不正确则扔掉数据包* 3. 取出数据*/if(data[0] HEAD[0] data[1] HEAD[1] data[2] HEAD[2]){Package_Head *head (Package_Head *)data;if(head-data_length size-sizeof(Package_Head)){// printf([PROC] 数据包到达类型 :%02X , 长度 : %d\n, data[3], head-data_length);switch(data[3]){case 0x01: // DHT22类型数据{DHT22_Package *pak (DHT22_Package *)data;printf([PROC] DHT22数据到达Temperature : %.2f°C Humidity : %.2f%%\n,pak-temperature, pak-humidity);break;}case 0x02: // LDR类型数据{LDR_Package *pak (LDR_Package *)data;printf([PROC] LDR数据到达光照 : %.2f\n,pak-lux);break;}default:{printf([PROC] 未知数据包 : %02X\n,data[3]);}}}else{printf([PROC] 数据长度检测失败 : %d\n, data[4]);}}else{printf([PROC] 包头检测失败 : %02X , %02X , %02X\n, data[0], data[1], data[2]);}
}
/*** brief 数据处理器线程* 该函数优先级建议比数据存入线程高保证数据先处理视具体情况而定* 当发现缓冲区中有数据时先处理缓冲区数据确保所有数据处理完成之后再进行下一次等待**/
void processor_task(void* param_t){size_t msize; // 消息的长度size_t size; // 接收的长度while(1){msize xMessageBufferNextLengthBytes(xMessageBuffer); // 获得下一条消息的长度while(msize0){char *data (char *)pvPortMalloc(msize); // 开辟空间准备接收消息size xMessageBufferReceive(xMessageBuffer, // 消息缓冲区句柄(void *)data, // 接收数据的首地址msize, // 最大接收数据长度portMAX_DELAY); // 等待时间if(size msize){// 正确收到了消息processor(data, size);}else{printf([PROC] 消息接收错误应接收 : %d , 实际接收 : %d\n, msize, size);}vPortFree(data); // 释放空间msize xMessageBufferNextLengthBytes(xMessageBuffer); // 获得下一条消息的长度}vTaskDelay(1000);}
}
void setup() {Serial.begin(115200);Serial.println(Hello, ESP32-S3!);xMessageBuffer xMessageBufferCreate(BUFFER_SIZE); // 初始化消息缓冲区if (xMessageBuffer NULL){printf(创建缓冲区失败内存不足!\n);}else{// 创建线程xTaskCreate(dht22_task, DHT22, 1024 * 4, NULL, 1, NULL);xTaskCreate(ldr_task, LDR, 1024 * 4, NULL, 1, NULL);xTaskCreate(processor_task, PROC, 1024 * 4, NULL, 1, NULL);}
}
void loop() {delay(10);
}在 setup 函数中首先通过 xMessageBufferCreate 对消息缓冲区进行格式化这个函数中只需要传入一个函数即缓冲区的大小单位是字节这里的字节数包含了消息自身和4个字节的数据长度。
dht22_task 用于操作DHT22硬件设备首先在程序的开头对设备进行初始化并对DHT22的数据包结构进行了格式化包含包头和包体两部分包头8字节前三个字节是包花用于识别一个标准完整的数据包第四个字节是数据类型DHT22数据类型是0x01第五个字节是包体长度因为DHT33返回2个float型数据理论上长度应该是8字节。 这里需要注意在 Package_Head 中最后预留了三个字节的数据及时这里不写系统仍然会对 Package_Head 进行4字节对齐包头原本是5字节4字节对齐后是8字节如果不写着三个字节的预留sizeof(Package_Head)时候得到的结果是5而sizeof(DHT22_Package)的时候得到的确实16会让初学者造成困扰而且在跨系统数据传递的时候也容易造成困扰和数据错乱所以遵循ESP32的规矩加入3个字节保留位。 数据构造完毕后通过 xMessageBufferSend 函数发出这个函数的原型如下
size_t xMessageBufferSend( MessageBufferHandle_t xMessageBuffer,const void *pvTxData,size_t xDataLengthBytes,TickType_t xTicksToWait );xMessageBuffer消息缓冲区句柄 pvTxData要发送数据的首地址指针 xDataLengthBytes发送数据的最大长度 xTicksToWait等待时间 返回实际发送的数据大小与流缓冲区发送数据时相同xDataLengthBytes 一定要比实际pvTxData的数据长度小或者相等否则会造成不可预估的数据溢出错误。 返回值是实际发送到缓冲区的数据如果发现xDataLengthBytes与返回值不一致的情况有可能是数据错误非常严重必定会导致系统崩溃或者是因为超时没有发送成功此时返回值应该为0。
ldr_task 和 dht22_tas 做所的事基本无异只是发送的数据长度不同LDR_Package 的数据长度是12正好也是四字节对齐的。 这里需要注意在 LDR_Package 和 DHT22_Package 中head 成员变量已经做了4字节对齐所以这里不会出现数据错乱但如果结构体本身没有进行四字节对齐那在其他结构体或作为其他结构成员使用的时候系统会自动进行四字节对齐与其让系统自己补充不如我们自己先补充上这是一个好习惯省得以后读代码的时候搞得自己很费解。 单片机编程中类似的坑还有很多猜的次数多了也就踏平了
在 processor_task 任务中有两层循环这也是物联网消息处理中比较合理的结构当缓冲区中没有数据的时候让出 CPU 所以在外层循环中有 delay 函数但如果缓冲区中存在消息则应该先处理缓冲区的消息在考虑让出 CPU以确保消息处理的及时性如果还有其他重要任务在同意核心中运行三种建议一种是提升该任务的优先级在就是在内循环中 delay 1ms 短暂让出 CPU或在内循环中通过 taskYIELD 函数手动强制调度。 另外在大多数程序中消息处理的优先级要比数据接收的优先级要高所以可以考虑提升数据处理任务的优先级。 但在一些程序中比如蓝牙和WiFi等流数据的接收应该考虑先接收数据所以数据接收任务的优先级要比事件处理的优先级要高。 我们应该因程序设定优先级不能生搬硬套死记硬背。 首先在外层循环调用 xMessageBufferNextLengthBytes 获取下一条消息的长度如果缓冲区中有消息长度必定是大于8的因为包头的长度为8而我们的数据中没有只含包头的数据包这是将进入内层循环调用 xMessageBufferReceive 获取消息具体内容该函数原型如下
size_t xMessageBufferReceive( MessageBufferHandle_t xMessageBuffer,void *pvRxData,size_t xBufferLengthBytes,TickType_t xTicksToWait );xMessageBuffer消息缓冲区句柄 pvRxData指向缓冲区的指针收到的信息 将被复制到该缓冲区 xBufferLengthBytes接收消息的最大长度可以大于消息本身这样仍然会返回下一条完整的消息该函数的返回值是消息本身的实际长度但如果 xBufferLengthBytes 小于本条消息的长度则消息不回被读出仍然会保留在缓冲区中该函数返回值为0。 xTicksToWait如果缓冲区中没有消息则会等待至超时。 该函数返回值为本条消息的实际长度和 xBufferLengthBytes 可能不一致。
如果消息长度是合法的任务将消息扔给 processor 进行处理。 注意在实际项目开发中消息是从外部WiFi、蓝牙、Zigbee、485等到达的我们需要完成三个线程的开发
数据接收线程该线程以流的方式接收二进制数据并存入到流缓冲区中消息构造线程该线程从流缓冲区中读取并验证消息的合法性并封装成数据包发送到消息缓冲区中消息处理线程该线程从消息缓冲区中读取消息并分发给处理函数进行处理但一般这个处理线程不会对消息进行同步处理根据消息类型可分为独立线程处理或进入指定类型的队列中进行处理。
在 数据接收线程 中任务只需要收取来自外部的二进制数据流并发送到缓冲区不做多余的动作也有些项目中将 数据接收线程 和 消息构造线程 合并的这取决于突发数据量的大小建议分开。
消息构造线程 需要分析的事项依次如下
流缓冲区中是否有一个完成的包头也就是我们构造流缓冲区时候的 TRIGGER_LEVEL 参数如果不够则等待如果够则执行步骤 2包花是否一致如果不一致删除流缓冲区第一位读取以为即可重新执行步骤 1 如果符合则执行步骤 3读取包头并从中获取数据长度查看缓冲区中是否仍然存在符合数据长度的数据如果不符合则等待如果符合则进入步骤 4从流缓冲区中读取剩余包体数据并构造完整的数据包送入 消息缓冲区中 等待消息处理线程的处理。
消息处理线程 需要做的事项如下
通过传入的二进制数据构造完成的数据包根据包类型可以做以下选择 a. 本地同步处理消息一般针对应答消息适用比如所有需要有 ACK 回应的消息或是要求重新发送的消息都再次进行处理 b. 放入指定类型消息的消息队列中等待其他任务处理一般针对普适性消息比如显示数据、保存数据等 c. 启用高优先级新线程对此消息进行及时处理一般针对突发命令比如紧急停止 d. 丢弃对于不认识的数据包直接丢弃。
而在例程中我们已经把2和3合并成了一个线程消息处理函数 processor 对消息合法性进行判断并且在本线程中同步处理了所有的消息。例程仅做演示正式项目开发中尽可能分开
消息缓冲区适合用在多对一的消息传递中多个发送者一个接收者对于多对多的方式可以使用但一般接受者都为负载线程执行相同或相似的任务。
关于消息缓冲区的所有API可以参考https://wokwi.com/projects/363029703756578817