Begin
Experimenting with ChatGPT
Background: A Technical Family Tradition
Every holiday season, I inevitably become the designated IT support for my family. This year, my challenge was integrating a Kaiser Nienhaus Wi-Fi roller blind into Home Assistant. Unfortunately, no official integration existed, and there was no documentation on the communication protocol. Through a combination of reverse engineering, protocol analysis, and proof-of-concept scripting, I managed to control the blinds via direct UDP multicast commands.
In the process, I discovered that the blinds also accept direct IP-based commands, which circumvented multicast restrictions in Home Assistant's Docker setup. Eventually, I built a custom Home Assistant component to seamlessly control the blinds.
Taking It a Step Further
As someone who frequently deals with software specifications in my daily work, I was curious: How well can ChatGPT handle a technical specification and translate it into working code? Given my experience with the Wi-Fi blinds, I decided to create a structured specification for their integration, including implementation details such as:
- Networking: How the blinds communicate over Wi-Fi (UDP multicast and direct IP communication).
- Threading: How command execution should be handled in a non-blocking way.
- Logging: Best practices for debugging and monitoring blind activity.
- Class Design: A structured approach to encapsulating the roller blind’s functionality.
Writing the Specification
A good specification not only outlines the requirements but also provides sufficient guidance on implementation. Here’s an excerpt from my spec:
Schreibe ein Pythonprogramm, nach folgender Spezifikation.
= Ziel =
Das Ziel dieses Progamm ist es mit dem nativen KN Connect system für Heimautomatisierung zu interagieren (Sensoren abfragen und Aktuatoren zu steuern), nachdem ein Einlernen bzw. setup über die KN Connect App durchgeführt wurde.
= Zwischenziel =
1. Erfassen aller Sensoren und Aktuatoren im Netzwerk.
2. Erfassen des AccessTokens um Aktuatoren zu steuern.
== Sensorik ==
=== Hearbeat ===
Sensoren sollen sich alle 60 Sekunden mit einem Hearbeat melden.
Aus dem Heartbeat( "msgType":"Heartbeat" ) sollen die wichtigsten Eigenschaften des Geräts ersichtlich werden.
Beispiel für einen Heartbeat:
{"msgType":"Heartbeat","mac":"acff4eef53b1","deviceType":"22000002","msgID":"20250101095635407","token":"XXXXXXXXXXXXXXXX","data":{"currentAngle":0,"currentPosition":0,"RSSI":-69}}
UDP-Port := 32101
mac := des Sensors oder Aktuators
deviceType := Der Typ des Geräts. Aus diesem werden auch die bereitgestellten Funktionen abgeleitet.
Folgende deviceType sind definiert:
DoubleRoller = "10000002"
Host = "02000001"
SubSet = "10000000"
TDBU = "10000001"
WifiCurtain = "22000000"
WifiModule = "22000005"
WifiReceiver = "22000001"
WifiSwitch = "22000007"
WifiTubularMotor = "22000002"
msgID := Ist der aktuelle Timestamp im Format yyyyMMddHHmmssSSS.
token := Wird gebraucht um Geräte zu steuern. Hat 16 alphanummerische Zeichen.
data := (Optional) Enthält die aktuellen Sensorwerte inkl. RSSI des WLAN-Geräts.
Optionale Parameter können, aber müssen nicht vorhanden sein.
=== GetDeviceList ===
Über die Nachricht, msgType GetDeviceList ({"msgType":"GetDeviceList","msgID":"20241231164135490"}), können alle erreichbaren Devices abgefragt werden.
UDP-Port := 32100
Die steuernde Einheit lädt am Anfang alle bekannten Geräte ein.
Bekannte Geräte können z.B. in der Datei "~/.config/KN_Connect/known_devices" gespeichert werden.
Falls der Speicherort nicht existiert, soll dieser angelegt werden.
Beim herunterfahren der steuernden Einheit werden alle bekannten Geräte gespeichert. Beim speichern wird das Flag "offline" für das jeweilige Gerät gesetzt.
Werden optionale Parameter beim Heartbeat bereitgestellt, so sollen diese zusätzlich auch gespeichert werden.
Diese Daten sollen dabei als eigener Eintrag hinzugefügt werden.
Beim Laden sollen diese Daten wieder eingelesen werden.
Die steuernde Einheit erfragt nach dem Starten per GetDeviceList alle vorhanden Geräte im Netzwerkt.
Dazu verschickt die steuernde Einheit Nachrichten auf UDP-Port 32100 versendet und gehen per UDP an die Multicast-Gruppe 238.0.0.18. Wichtig ist dabei, dass die steuernde Einheit als source UDP-port 32101 nutzt.
Die steuernede Einheit empfängt Nachrichten auf dem UDP-Port 32101. Diese Nachrichten können entweder über die Multicast-Gruppe eingehen oder direkt an die IP des steuernden Geräts.
Als erste soll die Logik zum empfangen von Nachrichten gestartet werden, damit alle Nachrichten empfangen werden können.
Nachrichten sollen dabei ohne extra whitespaces versendet werden.
Nach dem Empfang eines Hearbeats erstellt die steuernde Einheit einen Mapping Eintrag.
Sollte bereits ein Mapping für eine Gerät vorhanden sein, wird dieser aktualisiert.
Das Mapping enthält einen benutzerfreundlichen Namen zu mac, deviceType und token.
Der benutzerfreundliche Name setzt sich beim starten aus mac + deviceType zusammen und kann später angepasst werden.
Die steuernde Einheit bietet eine Funktion zur Ausgabe des Mappings im JSON format an.
=== GetDeviceListAck ===
Die Antwort auf GetDeviceList sieht wie folgt aus:
b'{"msgType":"GetDeviceListAck","mac":"acff4eef53b1","deviceType":"22000002","fwVersion":"A1.0.5_B0.0.1","ProtocolVersion":"0.9","token":"XXXXXXXXXXXXXXXX","data":[{"mac":"acff4eef53b1","deviceType":"22000002"}]}'
UDP-Port := 32101
msgType := GetDeviceListAck
mac := siehe oben
deviceType := siehe oben
fwVersion := aktuelle Firmware Version des devices
ProtocolVersion := genutzte Protocol Version
token := siehe oben
data := (Optional) zusätzliche Information
Optionale Parameter können, aber müssen nicht vorhanden sein.
Beim speichern sollen dazugehörige Daten aus GetDeviceListAck ebenfalls gespeichert werden.
Geräte die per Heartbeat oder per GetDeviceListAck erkannt wurden, werden mit dem Flag "online" markiert
Beim Verarbeiten von Heartbeat- und GetDeviceListAck-Nachrichten dürfen bestehende Daten im Mapping nicht überschrieben werden. Stattdessen sollen nur zusätzliche Daten hinzugefügt werden, wenn diese noch nicht vorhanden sind.
Bestehende Daten im Mapping werden nur aktualisiert, wenn neue Werte bereitgestellt werden.
== Aktuatorik ==
=== Kommandierung ===
Nach einer Kommandierung durch eine steuernde Einheit wird ein Report geschickt.
{'msgType': 'Report', 'mac': 'bcff4ddb53b1', 'deviceType': '22000002', 'msgID': '20250101160319815', 'data': {'currentPosition': 13, 'currentAngle': 180, 'RSSI': -68}}
Nach Empfang einer Report-Nachricht werden die Werte des Geräts (mac) im Mapping aktualisiert.
Nachrichten die Empfangen werden und deren msgType nicht bekannt sind sollen ausgegeben werden.
Steuernde Einheiten haben die Möglichkeit Aktuatoren zu über den msgType "WriteDevice" zu triggern.
Eine Nachricht könnte so aussehen
=== WriteDevice ===
WriteDevice erlaubt es Aktuatoren zu steuern.
Beispiel
{"msgType":"WriteDevice","mac":"bcff4ddb53b1","deviceType":"22000002","AccessToken":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX","msgID":"20250101161039800","data":{"operation":0}}
UDP-Port := 32100
msgType := WriteDevice
mac := die MAC-adresse des Aktuators
deviceType := Der Typ des Geräts. Aus diesem werden auch die bereitgestellten Funktionen abgeleitet.
AccessToken := Token der genutzt wird um das Gerät zu steuern.
data := Keine optinalen Daten. Diese müssen für den deviceType 22000002 gesetzt sein mit operation.
operation := kann drei Werte annehmen: 0 := Schließen, 1 := Öffnen, 2 := Stop
alternativ zu operation kann auch 'targetPosition' angegeben werden.
targetPosition := 0 := Schließen ... 100 := Offen . Werte dazwischen stellen den Öffnungsgrad dar.
Aus WriteDevice-Nachrichten soll das AccessToken zum Mapping hinzugefügt werden.
= Logging =
Fehler und unerwartetes Verhalten soll geloggt werden.
Logeinträge sollen unter ~/.cache/KN_Connect/logfile erstellt werden.
Logeinträge sollen alle 7 Tagen rotiert werden oder wenn mehr als 1024 Einträge erstellt wurden.
Logeinträge sollen nicht überschrieben werden.
Es soll zwischen kritischen, unkritischen Fehlern und unerwartetem Verhalten unterschieden werden.
Unerwartetes verhalten liegt dann vor, wenn z.B. in übertragenen Daten zusätzliche Informationen enthalten sind.
Optionale Parameter sind keine zusätzlichen Informationen.
Daher sollen alle erhaltenen Daten überprüft werden.
Eine Datagramm soll vollständig geloggt werden, wenn diese unerwarteten Daten beinhaltet.
Dazu soll zusätzlich auch die Absender-IP und der Absender-Port geloggt werden.
Der Logeintrag soll mit dem Loglevel WARNING erfolgen.
Kritische Fehler sollen mit dem Loglevel CRITICAL erfolgen.
Unkritische Fehler sollen mit dem Loglevel ERROR erfolgen.
Alle Lognachrichten mit dem Loglevel INFO sollen zusätzlich auf der Konsole ausgegeben werden.
Der Empfang von AccessToken soll mit dem Loglevel Info erfolgen.
Im Logeintrag soll der Dateiname und auch Zeile enthalten sein
= Implementierung =
Sockets sollen einen timeout von 0.5 Sekunden haben.
Nachrichten im JSON format sollen ohne zusätzliche Whitespace verschickt werden.
Es soll Threading genutzt werden.
After several iterations and some small changes I got the following results:
#!/usr/bin/env python3
import socket
import json
from datetime import datetime, timedelta
import os
import threading
import time
import logging
from logging.handlers import RotatingFileHandler
# Define device type mappings
DEVICE_TYPES = {
"10000002": "DoubleRoller",
"02000001": "Host",
"10000000": "SubSet",
"10000001": "TDBU",
"22000000": "WifiCurtain",
"22000005": "WifiModule",
"22000001": "WifiReceiver",
"22000007": "WifiSwitch",
"22000002": "WifiTubularMotor"
}
# UDP configuration
UDP_PORT_LISTEN = 32101
UDP_PORT_SEND = 32100
MULTICAST_GROUP = "238.0.0.18"
BUFFER_SIZE = 1024
CONFIG_DIR = os.path.expanduser("~/.config/KN_Connect")
CONFIG_FILE = os.path.join(CONFIG_DIR, "known_devices")
LOG_DIR = os.path.expanduser("~/.cache/KN_Connect")
LOG_FILE = os.path.join(LOG_DIR, "logfile.log")
# Logging configuration
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
RotatingFileHandler(LOG_FILE, maxBytes=1024 * 1024, backupCount=7),
logging.StreamHandler()
]
)
logger = logging.getLogger()
# Device mapping
device_mapping = {}
# Ensure configuration directory exists
def ensure_config_dir():
try:
os.makedirs(CONFIG_DIR, exist_ok=True)
logger.info(f"Ensured configuration directory exists: {CONFIG_DIR}")
except Exception as e:
logger.critical(f"Error ensuring configuration directory: {e}")
# Function to load known devices from config file
def load_known_devices():
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r") as file:
known_devices = json.load(file)
for mac, details in known_devices.items():
device_mapping[mac] = details
logger.info("Loaded known devices from config file.")
except Exception as e:
logger.error(f"Error loading config file: {e}")
# Function to save known devices to config file
def save_known_devices():
try:
# Set all devices to offline before saving and include optional details
for mac, details in device_mapping.items():
details["online"] = False
with open(CONFIG_FILE, "w") as file:
json.dump(device_mapping, file, indent=4)
logger.info("Saved known devices to config file.")
except Exception as e:
logger.error(f"Error saving config file: {e}")
# Start UDP listener
def start_udp_listener():
logger.info(f"Starting UDP listener on port {UDP_PORT_LISTEN}")
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", UDP_PORT_LISTEN))
# Join multicast group
group = socket.inet_aton(MULTICAST_GROUP)
mreq = group + socket.inet_aton("0.0.0.0")
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
try:
data, addr = sock.recvfrom(BUFFER_SIZE)
message = json.loads(data.decode("utf-8"))
# Check for unexpected data
unexpected_keys = set(message.keys()) - {"msgType", "mac", "deviceType", "msgID", "token", "data", "AccessToken"}
if unexpected_keys:
logger.warning(f"Unexpected keys in message: {unexpected_keys} from {addr}")
if message.get("msgType") == "GetDeviceListAck":
process_get_device_list_ack(message)
elif message.get("msgType") == "Heartbeat":
process_heartbeat(message)
elif message.get("msgType") == "Report":
process_report(message)
elif message.get("msgType") == "WriteDevice":
process_write_device(message)
else:
logger.warning(f"Unknown message type received: {message} from {addr}")
except Exception as e:
logger.error(f"Error receiving or processing message: {e}")
# Function to send GetDeviceList message
def send_get_device_list():
message = {
"msgType": "GetDeviceList",
"msgID": datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3]
}
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", UDP_PORT_LISTEN))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.sendto(json.dumps(message, separators=(",", ":")).encode("utf-8"), (MULTICAST_GROUP, UDP_PORT_SEND))
logger.info("Sent GetDeviceList message.")
# Function to handle received Heartbeat messages
def process_heartbeat(message):
try:
if message.get("msgType") != "Heartbeat":
return
mac = message.get("mac")
device_type_code = message.get("deviceType")
device_type = DEVICE_TYPES.get(device_type_code, "Unknown")
token = message.get("token")
data = message.get("data", {})
# Extract optional parameters
current_angle = data.get("currentAngle")
current_position = data.get("currentPosition")
rssi = data.get("RSSI")
# Generate user-friendly name
user_friendly_name = f"{mac}_{device_type}"
# Add or update device mapping
if mac not in device_mapping:
device_mapping[mac] = {
"name": user_friendly_name,
"deviceType": device_type,
"deviceTypeCode": device_type_code,
"token": token,
"fwVersion": None,
"ProtocolVersion": None,
"currentAngle": current_angle,
"currentPosition": current_position,
"RSSI": rssi,
"online": True
}
else:
existing_entry = device_mapping[mac]
existing_entry.update({
"currentAngle": current_angle if current_angle is not None else existing_entry.get("currentAngle"),
"currentPosition": current_position if current_position is not None else existing_entry.get("currentPosition"),
"RSSI": rssi if rssi is not None else existing_entry.get("RSSI"),
"online": True
})
logger.info(f"Processed Heartbeat for {user_friendly_name}")
except Exception as e:
logger.error(f"Error processing Heartbeat: {e}")
# Function to handle GetDeviceListAck messages
def process_get_device_list_ack(message):
try:
if message.get("msgType") != "GetDeviceListAck":
return
mac = message.get("mac")
device_type_code = message.get("deviceType")
device_type = DEVICE_TYPES.get(device_type_code, "Unknown")
token = message.get("token")
fw_version = message.get("fwVersion")
protocol_version = message.get("ProtocolVersion")
# Add or update device mapping
if mac not in device_mapping:
device_mapping[mac] = {
"name": f"{mac}_{device_type}",
"deviceType": device_type,
"deviceTypeCode": device_type_code,
"token": token,
"fwVersion": fw_version,
"ProtocolVersion": protocol_version,
"currentAngle": None,
"currentPosition": None,
"RSSI": None,
"online": True
}
else:
existing_entry = device_mapping[mac]
existing_entry.update({
"fwVersion": fw_version if fw_version is not None else existing_entry.get("fwVersion"),
"ProtocolVersion": protocol_version if protocol_version is not None else existing_entry.get("ProtocolVersion"),
"online": True
})
logger.info(f"Processed GetDeviceListAck for {mac}")
except Exception as e:
logger.error(f"Error processing GetDeviceListAck: {e}")
# Function to handle Report messages
def process_report(message):
try:
if message.get("msgType") != "Report":
return
mac = message.get("mac")
data = message.get("data", {})
# Extract optional parameters
current_angle = data.get("currentAngle")
current_position = data.get("currentPosition")
rssi = data.get("RSSI")
# Update device mapping
if mac in device_mapping:
existing_entry = device_mapping[mac]
existing_entry.update({
"currentAngle": current_angle if current_angle is not None else existing_entry.get("currentAngle"),
"currentPosition": current_position if current_position is not None else existing_entry.get("currentPosition"),
"RSSI": rssi if rssi is not None else existing_entry.get("RSSI"),
"online": True
})
logger.info(f"Processed Report for {mac}")
else:
logger.warning(f"Report received for unknown device: {mac}")
except Exception as e:
logger.error(f"Error processing Report: {e}")
# Function to handle WriteDevice messages
def process_write_device(message):
try:
if message.get("msgType") != "WriteDevice":
return
mac = message.get("mac")
device_type = message.get("deviceType")
access_token = message.get("AccessToken")
data = message.get("data", {})
if "operation" not in data:
logger.error(f"WriteDevice message missing 'operation': {message}")
return
operation = data.get("operation")
if operation not in [0, 1, 2]:
logger.error(f"Invalid operation value in WriteDevice message: {operation}")
return
if mac in device_mapping:
logger.info(f"Triggering device {mac} of type {device_type} with operation: {operation}")
else:
logger.warning(f"WriteDevice command received for unknown device: {mac}")
except Exception as e:
logger.error(f"Error processing WriteDevice: {e}")
# Function to output mapping as JSON
def get_device_mapping_json():
return json.dumps(device_mapping, indent=4)
if __name__ == "__main__":
try:
# Ensure config directory exists
ensure_config_dir()
# Load known devices
load_known_devices()
# Start the UDP listener first
logger.info("Initializing listener...")
listener_thread = threading.Thread(target=start_udp_listener, daemon=True)
listener_thread.start()
# Delay to ensure listener starts before sending GetDeviceList
time.sleep(1)
# Send GetDeviceList message
send_get_device_list()
# Keep the main thread alive
listener_thread.join()
except KeyboardInterrupt:
logger.info("Shutting down UDP listener.")
logger.info("Final device mapping:")
logger.info(get_device_mapping_json())
# Save known devices
save_known_devices()
I call this a success.
side note
On a side note, a specification with classes was not as successful and it seems like the training of ChatGPT is not as consistent for classes.