当前位置: 首页 > news >正文

做摄影网站的目的是什么意思自己怎么做网站的聚合页面

做摄影网站的目的是什么意思,自己怎么做网站的聚合页面,彩投网站建设,网页游戏排行榜2024文章目录链接参数设置logger 日志redis 链接mysql 链接emqx 链接mssql 链接tdengine 链接采集OPCUA的点表的配置信息设备点表OPCUA 采集 数据程序数据采集逻辑链接参数 import randomtdengine_connection_params {username: root,password: taosdata,host: 127.0.0.1,port: 6… 文章目录链接参数设置logger 日志redis 链接mysql 链接emqx 链接mssql 链接tdengine 链接采集OPCUA的点表的配置信息设备点表OPCUA 采集 数据程序数据采集逻辑链接参数 import randomtdengine_connection_params {username: root,password: taosdata,host: 127.0.0.1,port: 6030,database: test }mysql_connection_params {username: root,password: root,host: 127.0.0.1,port: 3306,database: th }mssql_connection_params {username: sa,password: Z,host: 127.0.0.1,port: 1433,database: SistarData,driver: ODBCDriver17forSQLServer }redis_connection_params {host: localhost, # Redis 服务器的地址port: 6379, # Redis 服务器的端口db: 0, # 使用的 Redis 数据库编号max_connections: 10, # 连接池的最大连接数decode_responses: True }emqx_connection_params {broker: 127.0.0.1,port: 1883,client_id: fpython_mqtt_client_{random.randint(1, 10)},keep_alive_interval: 60,password: None,username: None,topic_sub: None,topic_pub: None }logger_params {logger_name: opcua_adapter_logger,total_level: 20,file_handler_level: 10,control_handler_level: 20,file_name: rE:\TH\core\basic_service\opcua_adapter\logs\opcua_adapter_logger.txt,mode: a,max_bytes: 10485760,backup_count: 10,encoding: UTF-8,format: %(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s }opcua_adapter_params {alarm_consumer_queue_length: 100,archive_consumer_queue_length: 100,emqx_consumer_queue_length: 100,acquisition_frequency:1, # 采集周期monitor_frequency: 2, # 监控周期alarm_table_name: alarm, # 报警的表格archive_table_name: test, # 归档的数据库名称emqx_worker: 1,alarm_worker: 5,archive_worker:5 }设置logger 日志 import logging from concurrent_log_handler import ConcurrentRotatingFileHandlerfrom config import logger_paramsdef get_logger(logger_name, total_level, file_name, mode, max_bytes, backup_count, encoding, file_handler_level,control_handler_level, format) - logging.getLogger():logger logging.getLogger(logger_name)logger.setLevel(total_level) # 设置日志级别file_handler ConcurrentRotatingFileHandler(filenamefile_name,modemode,maxBytesmax_bytes,backupCountbackup_count,encodingencoding) # 设置输出文件file_handler.setLevel(file_handler_level)control_handler logging.StreamHandler()control_handler.setLevel(control_handler_level)formatter logging.Formatter(format)file_handler.setFormatter(formatter)control_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(control_handler)return loggerlogger get_logger(**logger_params) redis 链接 import redisfrom basic_service.opcua_adapter.config import redis_connection_paramspool redis.ConnectionPool(hostredis_connection_params[host],portredis_connection_params[port],dbredis_connection_params[db],max_connectionsredis_connection_params[max_connections],decode_responsesredis_connection_params[decode_responses],)# 获取连接对象 def get_redis_connection():while True:try:redis_conn redis.Redis(connection_poolpool)response redis_conn.ping()if response:return redis_connelse:continueexcept Exception as e:print.error(str(e))conn get_redis_connection() mysql 链接 用于连接tdengineimport time from basic_service.opcua_adapter.get_logger import logger from basic_service.opcua_adapter.config import mysql_connection_paramsfrom sqlalchemy import create_engineengine create_engine(fmysqlpymysql://{mysql_connection_params[username]}:{mysql_connection_params[password]}{mysql_connection_params[host]}:{mysql_connection_params[port]}/{mysql_connection_params[database]})def get_mysql_connection():while True:try:conn engine.connect()return connexcept Exception:logger.error(fMySQL连接建立失败,请检查网络是否畅通(ping {mysql_connection_params[host]}),服务器是否正常,是否开启远程连接)time.sleep(3)continueget_mysql_connection() emqx 链接 import paho.mqtt.client as mqtt import timefrom basic_service.opcua_adapter.config import emqx_connection_params from get_logger import loggerclass MQTTClient:def __init__(self, broker, port, client_id, keep_alive_interval, passwordNone, usernameNone, topic_subNone,topic_pubNone):self.broker brokerself.port portself.client_id client_idself.keep_alive_interval keep_alive_intervalself.topic_sub topic_subself.topic_pub topic_pubself.password passwordself.username username# 创建 MQTT 客户端self.client mqtt.Client(client_idself.client_id)if self.username and self.password:self.client.username_pw_set(usernameself.username, passwordself.password) # 如果有需要设置用户名密码self.client.on_connect self.on_connectself.client.on_disconnect self.on_disconnectself.client.on_message self.on_messageself.client.on_publish self.on_publishself.connect()self.client.loop_start()def on_connect(self, client, userdata, flags, rc):if rc 0:logger.info(fConnected to MQTT Broker! Returned code{rc})if self.topic_sub:client.subscribe(self.topic_sub) # 连接成功后订阅主题else:logger.error(fFailed to connect, return code {rc})def on_disconnect(self, client, userdata, rc):logger.info(Disconnected from MQTT Broker with code: str(rc))# 这里可以实现重连逻辑self.reconnect()def on_message(self, client, userdata, msg):# logger.info(fReceived message: {msg.topic} - {msg.payload.decode()})passdef on_publish(self, client, userdata, mid):# print(fMessage {mid} has been published.)pass# 重连机制def reconnect(self):try:logger.error(Attempting to reconnect...)self.client.reconnect() # 尝试重连except Exception as e:logger.error(fReconnection failed: {e})time.sleep(5) # 延时后再尝试def connect(self):try:self.client.connect(self.broker, self.port, self.keep_alive_interval)except Exception as e:logger.error(fConnection failed: {e})self.reconnect()get_emqx_connection MQTTClient(**emqx_connection_params)# 主循环发布消息 # try: # while True: # message Hello MQTT # result mqtt_client.client.publish(/test, message, qos0) # status result[0] # if status 0: # print(fSent {message} to topic /test) # else: # print(fFailed to send message to topic /test) # time.sleep(10) # 每10秒发布一次 # except KeyboardInterrupt: # print(Interrupted by user, stopping...) # finally: # mqtt_client.client.loop_stop() # mqtt_client.client.disconnect() mssql 链接 用于连接tdengineimport time #from basic_service.opcua_adapter.get_logger import logger from basic_service.opcua_adapter.config import mysql_connection_params, mssql_connection_paramsfrom sqlalchemy import create_engine, textengine create_engine(fmssqlpyodbc://sa:Z#127.0.0.1:1433/sistarData?driverODBCDriver17forSQLServer)def get_mssql_connection():while True:try:conn engine.connect()return connexcept Exception:logger.error(fMSSQL连接建立失败,请检查网络是否畅通(ping {mssql_connection_params[host]}),服务器是否正常,是否开启远程连接)time.sleep(3)continueconn get_mssql_connection()result conn.execute(text(SELECT TABLE_NAME,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME sistar_eng_areas;))print(result.fetchall()) tdengine 链接 用于连接tdengineimport time from basic_service.opcua_adapter.get_logger import logger from basic_service.opcua_adapter.config import tdengine_connection_paramsfrom sqlalchemy import create_engineengine create_engine(ftaos://{tdengine_connection_params[username]}:{tdengine_connection_params[password]}{tdengine_connection_params[host]}:{tdengine_connection_params[port]}/{tdengine_connection_params[database]})def get_tdengine_connection():while True:try:conn engine.connect()return connexcept Exception:logger.error(fTDEngine连接建立失败,请检查网络是否畅通(ping {tdengine_connection_params[host]}),服务器是否正常,是否开启远程连接,是否安装与服务器相同版本的客户端)time.sleep(3)continue 采集OPCUA的点表的配置信息 设备 [{No.: 1,device_name: device1,url: opc.tcp:\/\/192.168.10.132:4862},{No.: 2,device_name: device2,url: opc.tcp:\/\/192.168.10.132:4863} ]点表 [{No: 1,tag_uuid: tag0001,node_id: ns1;st|tag1,interval: 1,active: true,active_alarm: true,alarm_up: 100,alarm_down: 10,alarm_up_info: \u4e0a\u9650\u62a5\u8b66,alarm_down_info: \u4e0b\u9650\u62a5\u8b66,alarm_up_change: null,alarm_down_change: null,active_archive: true,archive_onchange: true,archive_interval: 1,active_scale: true,scale_sign: add,scale_factor: 1,mqtt_topic_name: topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n,unit: m\u00b3\/H,comments: \u704c\u88c5\u673a\u901f\u5ea61},{No: 2,tag_uuid: tag0002,node_id: ns1;st|tag2,interval: 1,active: true,active_alarm: true,alarm_up: 100,alarm_down: 10,alarm_up_info: \u4e0a\u9650\u62a5\u8b66,alarm_down_info: \u4e0b\u9650\u62a5\u8b66,alarm_up_change: null,alarm_down_change: null,active_archive: true,archive_onchange: true,archive_interval: 1,active_scale: true,scale_sign: sub,scale_factor: 2,mqtt_topic_name: topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n,unit: m\u00b3\/H,comments: \u704c\u88c5\u673a\u901f\u5ea62}]用户可以支持从excel导入数据。 从excel 上传数据到 系统中 用于初始化创建数据库创建数据库表import randomimport pandas as pd from sqlalchemy import text from connectors.tdengine import get_tdengine_connection from get_logger import loggerxlsx rE:\TH\core\basic_service\opcua_adapter\static\opcua_template.xlsxconnection get_tdengine_connection()db_name test stable metersdf pd.read_excel(xlsx, sheet_namedevice1)tag_uuid df.loc[:, [tag_uuid, unit]]create_table_sql for item in tag_uuid.values:_sql fCREATE TABLE IF NOT EXISTS {db_name}.{item[0]} USING {db_name}.{stable} TAGS ({item[1]}, {random.randint(1, 10)});create_table_sql _sqlcreate_db_stable_sql fCREATE DATABASE IF NOT EXISTS {db_name} KEEP 3650; use {db_name}; CREATE STABLE IF NOT EXISTS {db_name}.{stable} (ts TIMESTAMP, val FLOAT) TAGS ( unit BINARY(20), s_type BINARY(20));total_sql create_db_stable_sql create_table_sql print(total_sql) try:connection.execute(text(total_sql))connection.commit()logger.info(init success) except Exception as e:print(str(e))connection.rollback()logger.error(init failed) OPCUA 采集 数据程序 import datetime import json import os import queue import threading import time from threading import Thread from typing import List, Any import opcua.client.client import psutil as psutil from opcua.client.client import Client from opcua.common.node import Node from opcua.ua import UaStatusCodeError from config import opcua_adapter_params from connectors._redis import get_redis_connection from connectors.emqx import get_emqx_connection from connectors.mysql import get_mysql_connection from connectors.tdengine import get_tdengine_connection from get_logger import logger from sqlalchemy import textemqx_connection get_emqx_connection.clientclass OPCUAAdapter(object):OPCUA数据采集类def __init__(self, device_info, tag_id_to_node_id_map, tag_id_to_detail_map, use_subscribeTrue):self.url: str device_info.get(device_url)self._ua: opcua.client.client.Client Client(urlself.url, timeout5)self.connected: bool Falseself.thread_list: List[Thread] []self.raw_dict {}self.tag_id_to_node_id_map tag_id_to_node_id_mapself.tag_id_to_detail_map tag_id_to_detail_mapself.device_info device_infoself.alarm_consumer_queue queue.Queue(maxsizeopcua_adapter_params[alarm_consumer_queue_length])self.archive_consumer_queue queue.Queue(maxsizeopcua_adapter_params[archive_consumer_queue_length])self.emqx_consumer_queue queue.Queue(maxsizeopcua_adapter_params[emqx_consumer_queue_length])def connect(self):连接函数:return:try:if self.connected:returnelse:self._ua.connect()self.connected Truelogger.info(f初次连接{self.url}成功)returnexcept Exception as e:self.disconnect()self.connected Falselogger.error(初次连接失败,失败原因, e)# 开始重连self.reconnect()def disconnect(self):断开连接:return:try:if self._ua and self.connected:self._ua.disconnect()self.connected Falselogger.info(主动断开连接成功)except Exception as e:logger.error(主动断开连接失败失败原因, str(e))def reconnect(self):重连:return:index 0while True:try:self._ua.connect()self.connected Trueindex 0logger.info(f重连{self.url}成功!)returnexcept AttributeError as e:index 1logger.error(f第{index}次重连失败,失败原因{str(e)}!)self.connected Falsetime.sleep(index * 1)continueexcept ConnectionRefusedError as e:index 1logger.error(f第{index}次重连失败,失败原因{str(e)}!)self.connected Falsetime.sleep(index * 1)continueexcept OSError as e:index 1logger.error(f与OPCUA服务器未能建立连接,失败原因:{str(e)}!)self.connected Falsetime.sleep(index * 1)continueexcept Exception as e:index 1logger.error(f第{index}次重连失败,失败原因{str(e)}!)self.connected Falsetime.sleep(index * 1)continuedef interval_read(self, interval: int) - None:按照采集频率定时去采集:param interval::return:connection get_redis_connection()thread_name threading.current_thread().namenodes []while True:# 每分钟采集多少次采集超时多少次采集node数量多少个node是Nonestart_time time.time()if not self.connected:# 如果没有连接成功开启重连self.reconnect()else:try:nodes_str_list self.tag_id_to_node_id_map.keys()nodes [self._ua.get_node(node) for node in nodes_str_list]values self._ua.get_values(nodes)self.raw_dict dict(zip(nodes_str_list, values))except AttributeError as e:logger.error(f属性读取错误:{str(e)}!)except TimeoutError:logger.error(f接收服务端报文超时)except ConnectionRefusedError as e:self.disconnect()self.reconnect()logger.error(f数据获取失败,失败原因:{str(e)}!)except ConnectionAbortedError as e:self.disconnect()self.reconnect()logger.error(f数据获取失败,失败原因:{str(e)}!)except UaStatusCodeError as e:self.disconnect()self.reconnect()logger.error(f数据获取失败,失败原因:{str(e)}!)except OSError as e:self.disconnect()self.reconnect()logger.error(f数据获取失败,失败原因:{str(e)}!)except RuntimeError as e:self.disconnect()self.reconnect()logger.error(f运行错误,失败原因:{str(e)})except Exception as e:self.disconnect()self.reconnect()logger.error(f未捕获到的异常{str(e)})finally:end_time time.time()try:connection.hmset(performance,mapping{nodes: len(nodes), f{thread_name}_use_time: f{(end_time - start_time):.2f}})except Exception:if connection:connection.close()connection get_redis_connection()time.sleep(interval)def node_write(self, nodes: List[Node], values: List[Any]):写入node:param nodes::param values::return:try:self._ua.set_values(nodes, values)except Exception as e:logger.error(f数据写入失败,失败原因{str(e)})def monitor_thread(self):监视线程:return:redis_connection get_redis_connection()while True:try:current_process_id os.getpid()process psutil.Process(current_process_id)# 获取进程的基本信息process_name process.name()cpu_usage process.cpu_percent(interval1) # 进程的 CPU 使用率间隔 1 秒memory_info process.memory_info() # 进程的内存使用情况io_counters process.io_counters() # 进程的 IO 计数disk_usage psutil.disk_usage(/) # 获取根目录的磁盘使用情况thread_list []for thread in threading.enumerate():thread_list.append((thread.ident, thread.name, thread.is_alive()))if thread.name continuous_thread and thread.is_alive() False:logger.error(f读取线程出错请尽快联系管理员处理)redis_connection.hmset(nameperformance,mapping{process_name: process_name, cpu_usage: cpu_usage,memory_info_RSS: f{memory_info.rss / (1024 * 1024):.2f} MB,memory_info_VMS: f{memory_info.vms / (1024 * 1024): .2f} MB,io_read: f{io_counters.read_bytes / (1024 * 1024):.2f} MB,io_write: f{io_counters.write_bytes / (1024 * 1024):.2f} MB,disk_usage: f{disk_usage.percent}%,threads: json.dumps(thread_list),pid: current_process_id,archive_consumer_length: self.archive_consumer_queue.qsize(),alarm_consumer_length: self.alarm_consumer_queue.qsize(),emqx_consumer_length: self.emqx_consumer_queue.qsize(),})except Exception as e:logger.error(f监控子线程出错:{str(e)})if redis_connection:redis_connection.close()redis_connection get_redis_connection()finally:time.sleep(int(opcua_adapter_params[monitor_frequency]))def change_data_notifier(self, timestamp, node_id, new_data, old_data)::param timestamp::param node::param new_data::param old_data::return:try:tag_id self.tag_id_to_node_id_map[node_id]except KeyError:passelse:content {timestamp: timestamp,tag_id: tag_id,new_data: new_data,old_data: old_data}self.alarm_consumer_queue.put(content)self.emqx_consumer_queue.put(content)self.archive_consumer_queue.put(content)def consumer_alarm_info(self):处理报警信息:return:redis_connection get_redis_connection()alarm_table_name opcua_adapter_params[alarm_table_name]connection get_mysql_connection()thread_name threading.current_thread().namealarm_count 0while True:try:start_time time.time()content self.alarm_consumer_queue.get()tag_detail self.tag_id_to_detail_map.get(content[tag_id])active_alarm tag_detail.get(active_alarm)if active_alarm:up_limit tag_detail.get(alarm_up)down_limit tag_detail.get(alarm_down)if float(content[new_data]) float(up_limit):sql finsert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ({self.device_info[device_name]},{content[tag_id]}, {tag_detail.get(comments)}, {tag_detail.get(alarm_up_info)}, {tag_detail.get(alarm_up)}, {content[new_data]})try:connection.execute(text(sql))connection.commit()alarm_count 1except Exception as e:logger.error(f数据插入错误,错误原因:{str(e)}!)connection.rollback()elif float(content[new_data]) float(down_limit):sql finsert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ({self.device_info[device_name]},{content[tag_id]}, {tag_detail.get(comments)}, {tag_detail.get(alarm_down_info)}, {tag_detail.get(alarm_down)}, {content[new_data]})try:connection.execute(text(sql))connection.commit()alarm_count 1except Exception as e:logger.error(f数据插入错误,错误原因:{str(e)}!)connection.rollback()except Exception as e:logger.error(str(e))if connection:connection.close()connection get_mysql_connection()finally:end_time time.time()try:redis_connection.hmset(performance,mapping{f{thread_name}_use_time: f{(end_time - start_time):.2f},alarm_count: alarm_count})except Exception:if redis_connection:redis_connection.close()redis_connection get_redis_connection()time.sleep(2)def consumer_archive_info(self):thread_name threading.current_thread().nameredis_connection get_redis_connection()connection get_tdengine_connection()exception_buffer {}buffer {}db_name opcua_adapter_params[archive_table_name]exception_time 0total_time 0for k in self.tag_id_to_node_id_map.values():buffer.setdefault(k, [])# 异常buffer当数据没有被正常插入时数据不被丢弃放到异常队列中一旦恢复了连接先将异常队列中的数据恢复exception_buffer.setdefault(k, [])while True:try:start_time time.time()for k, v in exception_buffer.items():if len(v) 0:sql fINSERT INTO {db_name}.{k} VALUES {str(v).replace([, ).replace(], )}try:connection.execute(sql)connection.commit()exception_buffer.setdefault(k, [])except Exception:exception_time 1connection.rollback()content self.archive_consumer_queue.get()tag_detail self.tag_id_to_detail_map.get(content[tag_id])if tag_detail.get(active_archive):timestamp datetime.datetime.strftime(datetime.datetime.now(), %Y-%m-%d %H:%M:%S)try:data float(content[new_data])except ValueError:data 0.0except TypeError:data 0.0except Exception:data 0.0if len(buffer[content[tag_id]]) 100:buffer[content[tag_id]].append((timestamp, data))else:sql fINSERT INTO {db_name}.{content[tag_id]} VALUES {str(buffer[content[tag_id]]).replace([, ).replace(], )};try:connection.execute(text(sql))connection.commit()buffer[content[tag_id]] []except Exception:logger.error(finsert error:{sql})connection.rollback()exception_time 1if len(exception_buffer) 10000:exception_buffer[content[tag_id]].extend(buffer[content[tag_id]])buffer[content[tag_id]] []else:# 如果超过设定的缓存值滞后将旧值丢弃掉exception_buffer[content[tag_id]] exception_buffer[content[tag_id]][100:]total_time 1except Exception as e:logger.error(str(e))exception_time 1if connection:connection.close()connection get_tdengine_connection()finally:end_time time.time()try:redis_connection.hmset(performance,mapping{buffer: len(buffer), exception_buffer: len(exception_buffer),f{thread_name}_use_time: f{(end_time - start_time):.2f},total_time: total_time,exception_time: exception_time})except Exception:if redis_connection:redis_connection.close()redis_connection get_redis_connection()time.sleep(2)def consumer_emqx_info(self):while True:try:content self.emqx_consumer_queue.get()tag_detail self.tag_id_to_detail_map.get(content[tag_id])mqtt_topic_str tag_detail.get(mqtt_topic_name)topic_list []for topic in mqtt_topic_str.split(;):topic_name, qos topic.split(,)topic_list.append({topic_name: topic_name.split(:)[1].strip().replace(\n, ),qos: qos.split(:)[1].strip().replace(\n, )})payload json.dumps({content[tag_id]: content[new_data]})for topic in topic_list:emqx_connection.publish(topictopic[topic_name], payloadpayload, qosint(topic[qos]))except Exception as e:logger.error(str(e))def subscribe_data_change(self):copy_raw_dict self.raw_dict.copy()flag Falsewhile True:d1_keys self.raw_dict.keys()d2_keys copy_raw_dict.keys()if _ : d1_keys - d2_keys:flag Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), k,self.raw_dict[k],0)if _ : d2_keys - d1_keys:flag Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), k, 0,self.raw_dict[k])commen_keys d1_keys d2_keysfor key in commen_keys:if copy_raw_dict[key] ! self.raw_dict[key]:self.change_data_notifier(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), key,copy_raw_dict[key], self.raw_dict[key])flag Trueif flag:copy_raw_dict self.raw_dict.copy()flag Falsetime.sleep(0.5)def run(self):启动服务:return:try:self.connect()interval_acqusition_task Thread(targetself.interval_read, namecontinuous_thread, args(1,))monitor_thread_task Thread(targetself.monitor_thread, namemonitor_thread)subscribe_thread_task Thread(targetself.subscribe_data_change, namesubscribe_thread)for i in range(opcua_adapter_params[alarm_worker]):consumer_alarm_info_task Thread(targetself.consumer_alarm_info, namefconsumer_alarm_info_{i1})self.thread_list.append(consumer_alarm_info_task)for i in range(opcua_adapter_params[archive_worker]):consumer_archive_info_task Thread(targetself.consumer_archive_info, namefconsumer_archive_info_{i1})self.thread_list.append(consumer_archive_info_task)for i in range(opcua_adapter_params[emqx_worker]):consumer_emqx_info_task Thread(targetself.consumer_emqx_info, namefconsumer_emqx_info_{i1})self.thread_list.append(consumer_emqx_info_task)self.thread_list.append(interval_acqusition_task)self.thread_list.append(monitor_thread_task)self.thread_list.append(subscribe_thread_task)for th in self.thread_list:th.start()for th in self.thread_list:th.join()except Exception as e:logger.error(str(e))finally:get_emqx_connection.client.loop_stop()get_emqx_connection.client.disconnect()def init():conn get_redis_connection()while True:try:device_info json.loads(conn.get(device_info))tag_id_to_node_id_map json.loads(conn.get(tag_id_to_node_id_map))tag_id_to_detail_map json.loads(conn.get(tag_id_to_detail_map))if device_info and tag_id_to_detail_map and tag_id_to_node_id_map:return device_info, tag_id_to_node_id_map, tag_id_to_detail_mapelse:logger.error(init error)time.sleep(3)continueexcept Exception:logger.error(Init Failed!)time.sleep(3)continuedef main():device_info, tag_id_to_node_id_map, tag_id_to_detail_map init()opcua_adapter OPCUAAdapter(device_infodevice_info, tag_id_to_node_id_maptag_id_to_node_id_map,tag_id_to_detail_maptag_id_to_detail_map)opcua_adapter.run()if __name__ __main__:main() 数据采集逻辑 # 断线重连机制 多进程中# 多进程模型 一个设备一个进程# 多线程模型 不同的采集频率一个线程一个进程中的多个线程共享一个链接进程# 客户端采用订阅的方式# 客户端采用轮旋的方式# 字段的含义 字段名称 字段含义 数值 No 自增id tag_uuid 变量的唯一ID node_id UA node_id interval 采集周期以秒为单位 active 是否激活TURE, FALSE 必须大写 active_alarm 是否激活报警 alarm_up 报警上限 alarm_down 报警下限 alarm_up_info 报警上限值 alarm_down_info 报警下限值 alarm_up_change 达到报警上限的修改值 alarm_down_change 达到报警下限的修改值 active_archive 激活归档 archive_onchange 变化时归档 为FALSE archive_interval有效 archive_interval 轮训归档周期 active_scale 激活变换 scale_sign 符号 scale_factor 因子 mqtt_topic_name unit comments
http://www.zqtcl.cn/news/123447/

相关文章:

  • 网站推广seo是什么上海市人力资源网官网
  • 玉溪做网站的公司delphi xe10网站开发
  • 使用vue做的网站有哪些企业门为什么要建设门户网站
  • 上海移动云网站建设在门户网站上爆光怎么做
  • 网站建设开票内容百度浏览器广告怎么投放
  • 深圳公司网站建立小程序商店制作
  • 网站建设知识网犀牛云做网站多少钱
  • 东莞seo优化推广重庆做网络优化公司电话
  • 网站建设的设计思路高校建设网站的特色
  • 宁波网站建设八宝山做网站的公司
  • 哪里有网站建设多少钱网站建设哪家服务态度好
  • 白云区网站开发公司备案不关闭网站的方法
  • 男的做那个视频网站家用电脑可以做网站服务器
  • 网站建设的行业客户烟台市未成年思想道德建设网站
  • 设计个网站要多少钱鼓楼网站开发
  • 东莞外贸网站搭建制作北京app开发制作
  • 优化网站公司外包微信商城怎么开店
  • 网站设计的导航栏怎么做东莞seo网络优化
  • wordpress直接上传视频网站吗做网站软件
  • 电脑维修网站模板下载来个网站吧好人一生平安2021
  • 做公益选哪个网站好网站建设方案多少钱
  • 丰台做网站的公司vs2015 手机网站开发
  • 宝思哲手表网站qq官网登录入口网页版
  • 二手书网站开发设计太原建设网站的公司
  • 江门网站seo推广qq代挂网站建设
  • 合肥制作网站企业做文字logo的网站
  • php 网站提速有没有帮人做简历的网站
  • 九江建网站报价比特币网站做任务
  • 电子商务网站开发目的和意义网站建设湖南岚鸿建设
  • 网站改版提交给百度个人定做衣服店