konecne jsem se dokopal k tomu, abych trochu ucesal svuj python skript na export dat z elektromeru ( CEZ ADX12A), aby sel pouzit i nekym jinym.
Co budete potrebovat:
- USB hlavici pro cteni dat z elektromeru. Ja mam tuto: https://www.elektromery.com/product/pri ... ice-usb/87
- Prevodnik USB=>ETH=>USB, pokud neplanujete mit zarizeni primo u elektromeru. Neco jako toto: Aliexpress
opticka hlavice => USB2ETH => 20m UTP kabel => ETH2USB => USB port na stroji, kde mi bezi HA.
Python skript pro cteni dat
Kód: Vybrat vše
#!/bin/python3
#This script reads data from CEZ electricity metering device to mqtt brooker. It uses USB reading device (magnetic head) attached on meter device. This script is messy, I know but works for my purpose :) Feel free to modify.
#You need to install mqtt package mqtt like this: pip3 install paho-mqtt Other packages are standard python packages.
import serial
import re
import csv
import time
import logging
from datetime import datetime
from paho.mqtt import client as mqtt_client
#Configuration
PORT='/dev/ttyUSB1' # device eg /dev/ttyUSB1
# obis codes send to MQTT broker eg obis_codes=["1.8.0","1.8.1","1.8.2"]
obis_codes=["1.8.0","1.8.1","1.8.2"]
#MQTT configuration
broker = 'mqtt.example.org'
port = 1883 #default 1883
topic = "home/electricity_meter"
# generate client ID with pub prefix randomly
client_id = "" #Unique id, eg 5f897aa8-4574-11ea-b878-0242ac120002
username = ""
password = ""
logfile_name="electricity_meter.log"
store_csv=False #True or False
csv_filename="data.csv" #date will be added to filename
## DO NOT CHANGE
logging.basicConfig(filename=logfile_name, filemode='a', format='%(asctime)s - %(message)s', level=logging.INFO)
baudrates=(300,600,1200,2400,4800,9600,19200) # allowed speed from IEC
# search for particular obis code in result
def searchfor(matrix, obis_code):
for i in range(len(matrix)):
if matrix[i][0] == obis_code:
return matrix[i] # returns whole row with searched code
#send data to MQTT brooker
def publish(client,topic,msg):
time.sleep(1)
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
# connect to mqtt brooker
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def read_serial_data():
buffer=''
a=b''
charcount=0
read=True
print("Start reading data from "+PORT)
print("This take up to 30s")
while (read == True):
a = ser.read(1) # read from serial device
buffer += a.decode("ascii") # decode from acii
charcount += 1
if (buffer[-1] == chr(0x21)): #check for "!" - indicator end of message
read = False # stops the loop
print("! detected, end of transmission")
print(charcount," characters transsmited")
ser.close() # close serial port
return buffer
def write_to_csv(result, csv_name):
with open(csv_name,"w+") as my_csv:
csvWriter = csv.writer(my_csv,delimiter=';')
csvWriter.writerows(result)
# set serial port configuration
ser =serial.Serial(port=PORT,baudrate=300,bytesize=serial.SEVENBITS,parity=serial.PARITY_EVEN,stopbits=serial.STOPBITS_ONE,timeout=5)
time.sleep(0.2) # mandatory delay
print("Sending request for initial data")
ser.write(b'/?!\r\n') # sends request for data - standard format
response=ser.readline()
print("Done")
response=response.decode("ascii") # convert response from binary to ASCII
speed=int(response[4]) # get max speed of communication from response from 5th char of string
print ("Setting speed to:",baudrates[speed])
s=bytes([6,48,48+speed,48,13,10]) #creates request for data readout mode #<ACK>050<CR><LF>
ser.write(s) # sends request for data
time.sleep(0.2) # mandatory delay
ser.flush() # flush serial buffer
ser.baudrate = baudrates[speed] # set higher speed of communication
serial_data=read_serial_data() # reads data from serial device
structured_data=[["code","value","unit","date"]] # creates 2D array with column headers
for line in serial_data.split('\n'): # read string line by line (\n as delimiter)
line=line.replace("kW,", "kW;") # changes second "," for ";" before split
line=line.replace(" imp", ";imp") # changes space for ";" before split
structured_data.append(re.split('\(|\*|;',str.rstrip(re.sub(r"[\)]", "", line)))) # re.sub remove ")" then str.rstrip remove \r\n then re.split splits data into lists and append as new row in 2d array
# show data in matrix
#print('\n'.join(['\t'.join([str(cell) for cell in row]) for row in structured_data])) # prints result as 2D matrix
# convert date string from meter device to standard date
for i in range(len(structured_data)):
if ((len(structured_data[i])==4) and (i>1)): # search for 4th column with date
structured_data[i][3]=datetime.strptime(structured_data[i][3], '%y%m%d%H%M')
# writes data to csv
if (store_csv == True):
now = datetime.now()
date_string = now.strftime("%Y%m%d-%H%M%S")
write_to_csv(structured_data,date_string+"_"+csv_filename)
client = connect_mqtt()
client.loop_start()
# find obis codes in result a send them into mqtt brooker, this function can handle only obis codes with three values in a row (code,value,unit). It doesn't work with date.
for i in obis_codes:
code,value,unit=searchfor(structured_data,i)
publish(client,topic+"/"+code,float(value.replace(',','.'))) # replaces decimal "," with "." - needed to convert from str do float (deletes leading zeroes)
logging.info(i+":"+str(float(value.replace(',','.'))))
Kód: Vybrat vše
mqtt:
sensor:
- name: "Elektromer - celkova spotreba"
unique_id: "home-elektromer-1.8.0"
device_class: "energy"
state_class: "total_increasing"
unit_of_measurement: "kWh"
state_topic: "home/elektromer/1.8.0"
last_reset_topic: "home/elektromer/1.8.0"
last_reset_value_template: '1970-01-01T00:00:00+00:00'
- name: "Elektromer - vysoký tarif"
unique_id: "home-elektromer-1.8.1"
device_class: "energy"
state_class: "total_increasing"
unit_of_measurement: "kWh"
state_topic: "home/elektromer/1.8.1"
last_reset_topic: "home/elektromer/1.8.1"
last_reset_value_template: '1970-01-01T00:00:00+00:00'
- name: "Elektromer - nízký tarif"
unique_id: "home-elektromer-1.8.2"
device_class: "energy"
state_class: "total_increasing"
unit_of_measurement: "kWh"
state_topic: "home/elektromer/1.8.2"
last_reset_topic: "home/elektromer/1.8.2"
last_reset_value_template: '1970-01-01T00:00:00+00:00'
Snad to nekomu pomuze. Pokud to budete zkouset i s jinym druhem elektromeru a bude to fungovat, urcite dejte vedet ve vlaknu. Dopredu rikam, ze nejsem programator a muj kod je takovy slepenec ze stackoverflow a ruznych navodu na internetu. Takze pokud se toho nekdo ujme a ten kod ucese a bude udrzovat, budu jen rad.