这个是我用来看EMQ服务器里面数据的工具,MQTTX,挺好用的哦 首先,我们需要这些库,下载起来也很简单,直接百度安装即可,安装好之后import进来 以主题“weight_pub”为例 首先是进行获取MQTT的数据,函数API使用实例如下:
##MQTT操作
def MQTTWeight():
# config._init()
client_weight = mqtt.Client()
client_weight.on_connect = on_connect_weight
client_weight.on_message = on_message_weight
client_weight.connect('**.**.***.***',****, 600) # 600为keepalive的时间间隔
client_weight.subscribe('weight_pub', qos=0)
client_weight.loop_forever() # 保持连接
这个函数表示正常连接到了EMQ服务器的weight_pub主题,并且会进行回显告诉用户成功连上服务器
def on_connect_weight(client, userdata, flags, rc):
print("Connected with weight result code: " + str(rc))
接下来在on_message_weight(client, userdata, msg) 函数中处理我们获得到的数据
def on_message_weight(client, userdata, msg):
var = str(msg.payload)
if(get_value('WriteEnable') == 1):
set_value('weight',(GetWeight(var)))
if(get_value('weight') > 10):
total_data['weight'] -= 2.75
# print ( 'wei :%.2f' % total_data['weight'] ,end=':')
至此数据获取完成,接下来进行数据库记录修改操作,我已经在数据库里面建好记录了,所以只要修改对应记录里面的数据即可,python通过id寻找相应的记录。
##数据库操作
def updateHandPart():
if(get_value('heartRate') > 0):
conn = get_conn()
cur = conn.cursor()
sql = 'UPDATE bodyhealthmonitor SET heartRate=%s WHERE id = %s';
args = (get_value('heartRate'), total_data['id'])
result = cur.execute(sql,args)
total_data['heartRate'] = 0
# print(result,end=';')
sql = 'UPDATE bodyhealthmonitor SET diastolicPressure=%s WHERE id = %s';
args = (get_value('diastolicPressure'), total_data['id'])
result = cur.execute(sql,args)
total_data['diastolicPressure'] = 0
# print(result,end=';')
sql = 'UPDATE bodyhealthmonitor SET systolicPressure=%s WHERE id = %s';
args = (get_value('systolicPressure'), total_data['id'])
total_data['systolicPressure'] = 0
result = cur.execute(sql,args)
# print(result)
conn.commit()
cur.close()
conn.close()
这个函数是为了连接上数据库的
def get_conn():
conn = pymysql.connect(host='*****', port=*****, user='*****', passwd='*****', db='*****')
return conn
最后在我们的main函数里面开线程获取数据之后处理数据
#*****代表用户自己的数据
total_data = {'WriteEnable':0, 'weight': 0.00, 'diastolicPressure': 0,
'heartRate': 0, 'systolicPressure':0,
'height':0.00, 'bloodOxygen':0.00, 'BMI':0.00,
'idealWeight':0.00, 'temperature':0.00,'id':1000}
def set_value(name, value):
total_data[name] = value
def get_value(name, defValue=None):
try:
return total_data[name]
except KeyError:
return defValue
#获取体重
def GetWeight(str):
var = ''
cnt = 0
for char in str:
cnt = cnt + 1
if char >= '0' and char <= '9' or char == '.':
var += char
return float(var)
def on_connect_weight(client, userdata, flags, rc):
print("Connected with weight result code: " + str(rc))
def on_message_weight(client, userdata, msg):
var = str(msg.payload)
if(get_value('WriteEnable') == 1):
set_value('weight',(GetWeight(var)))
if(get_value('weight') > 10):
total_data['weight'] -= 2.75
# print ( 'wei :%.2f' % total_data['weight'] ,end=':')
##MQTT操作
def MQTTWeight():
# config._init()
client_weight = mqtt.Client()
client_weight.on_connect = on_connect_weight
client_weight.on_message = on_message_weight
client_weight.connect('**.**.***.***',****, 600) # 600为keepalive的时间间隔
client_weight.subscribe('weight_pub', qos=0)
client_weight.loop_forever() # 保持连接
def get_conn():
conn = pymysql.connect(host='*****', port=*****, user='*****', passwd='*****', db='*****')
return conn
##数据库操作
def updateHandPart():
if(get_value('heartRate') > 0):
conn = get_conn()
cur = conn.cursor()
sql = 'UPDATE bodyhealthmonitor SET heartRate=%s WHERE id = %s';
args = (get_value('heartRate'), total_data['id'])
result = cur.execute(sql,args)
total_data['heartRate'] = 0
# print(result,end=';')
sql = 'UPDATE bodyhealthmonitor SET diastolicPressure=%s WHERE id = %s';
args = (get_value('diastolicPressure'), total_data['id'])
result = cur.execute(sql,args)
total_data['diastolicPressure'] = 0
# print(result,end=';')
sql = 'UPDATE bodyhealthmonitor SET systolicPressure=%s WHERE id = %s';
args = (get_value('systolicPressure'), total_data['id'])
total_data['systolicPressure'] = 0
result = cur.execute(sql,args)
# print(result)
conn.commit()
cur.close()
conn.close()
if __name__ == '__main__':
try:
thread_weight = threading.Thread(target=MQTTWeight)
thread_weight.setDaemon(True)
thread_weight.start()
thread_height = threading.Thread(target=MQTTHeight)
thread_height.setDaemon(True)
thread_height.start()
thread_blood = threading.Thread(target=MQTTBoold)
thread_blood.setDaemon(True)
thread_blood.start()
var = 1
while(var == 1):
time.sleep(1)#延时1s查看数据
if(get_value('WriteEnable') == 1):
update()
total_data['WriteEnable'] = 0
else:
updateHandPart()
except KeyboardInterrupt:
stop_thread(thread_weight)
stop_thread(thread_height)
stop_thread(thread_blood)
print('end')
(づ ̄3 ̄)づ╭❤~一键三连,这次一定(๑•̀ㅂ•́)و✧