Begin
The holiday season often brings the joy of family gatherings—and a recurring role as the family IT support. This year’s project was integrating a Kaiser Nienhaus Wi-Fi roller blind into Home Assistant. Since no official integration exists and documentation about the communication protocol is lacking, the task required reverse engineering, crafting a proof of concept, and building a custom Home Assistant component.
Using the Official Tools (KN Connect)
The first step was to use the official app. After installing the KN Connect app, registering an account, and adding the roller blind to the Wi-Fi network, basic operations (open/close) worked seamlessly. Testing showed that the roller blind could still be controlled locally after blocking internet access via a firewall. This confirmed the possibility of a local solution and set the stage for further exploration.
Reverse Engineering the KN Connect App
Attempts to gather information about the roller blind using `nmap` scans failed. However, the KN Connect app revealed two communication mechanisms: MQTT and a multicast-based protocol. The multicast service operates over UDP on ports `32100` and `32101`, broadcasting to the group `238.0.0.18`. By running a simple listener on these ports, messages between the app and the roller blind were intercepted.
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
import socket
MCAST_GRP = "238.0.0.18"
MCAST_PORT = 32100
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", MCAST_PORT))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(MCAST_GRP) + socket.inet_aton("0.0.0.0"))
while True:
data, addr = sock.recvfrom(1024)
print(f"Message from {addr}: {data.decode()}")
Proof of Concept
With the protocol understood, a script was written to send commands directly to the roller blind using the multicast service. The `AccessToken` is critical for authenticating commands. Below is a Python script for closing the blind:
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
import socket
import struct
import datetime
# Multicast group and port
multicast_group = '238.0.0.18'
port = 32100
now = datetime.datetime.now()
datetime_string = str.encode(
f"{now.year}{now.month}{now.day}{now.hour}{now.minute}{now.second}{int(now.microsecond / 1000)}"
)
CLOSE = b"0"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4s4s', group, socket.inet_aton('0.0.0.0'))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
message = (
b'{"msgType":"WriteDevice","mac":"XXXXXXXXXXXX","deviceType":"22000002",'
b'"AccessToken":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX","msgID":"'
+ datetime_string
+ b'","data":{"operation":'
+ CLOSE
+ b'}}'
)
try:
sock.sendto(message, (multicast_group, port))
finally:
sock.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq)
sock.close()
This script successfully commands the roller blind to close. Operations for opening and stopping can be implemented by modifying the `operation` field.
Integrating the Proof of Concept into Home Assistant
To integrate the roller blind into Home Assistant, a custom component was created. The Docker container running Home Assistant couldn't use UDP multicast, so direct IP commands were used instead. Assigning a static IP to the roller blind in the DHCP server resolved this issue.
Steps to Create the Integration
- Create a directory: custom_components/kn_cover.
- Add the following files:
- manifest.json
- __init__.py
- knconnect.py
{
"domain": "kn_cover",
"name": "KN Cover",
"codeowners": ["Friedrich Hust"],
"dependencies": [],
"documentation": "https://airmack.de",
"iot_class": "local_polling",
"requirements": [],
"version": "0.9.0"
}
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
from __future__ import annotations
import logging
import struct
import datetime
import socket
from datetime import timedelta
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.util.hass_dict import HassKey
from homeassistant.components.cover import (
ATTR_POSITION,
ATTR_TILT_POSITION,
DOMAIN as COVER_DOMAIN,
CoverDeviceClass,
CoverEntity,
CoverState,
CoverEntityFeature,
)
from homeassistant.const import ( # noqa: F401
SERVICE_CLOSE_COVER,
SERVICE_CLOSE_COVER_TILT,
SERVICE_OPEN_COVER,
SERVICE_OPEN_COVER_TILT,
SERVICE_SET_COVER_POSITION,
SERVICE_SET_COVER_TILT_POSITION,
SERVICE_STOP_COVER,
SERVICE_STOP_COVER_TILT,
SERVICE_TOGGLE,
SERVICE_TOGGLE_COVER_TILT,
STATE_CLOSED,
STATE_CLOSING,
STATE_OPEN,
STATE_OPENING,
)
DOMAIN = "cover"
DATA_COMPONENT: HassKey[EntityComponent[CoverEntity]] = HassKey(DOMAIN)
SCAN_INTERVAL = timedelta(seconds=15)
_LOGGER = logging.getLogger(__name__)
IP = "192.168.0.2" # <--- ADD IP HERE
TOKEN = b"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" # <--- ADD TOKEN HERE
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None
) -> None:
"""Set up the sensor platform."""
add_entities([KNCover(IP, 32100)])
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Track states and offer events for covers."""
component = hass.data[DATA_COMPONENT] = EntityComponent[CoverEntity](
_LOGGER, DOMAIN, hass, SCAN_INTERVAL
)
await component.async_setup(config)
class KNCover(CoverEntity):
_attr_device_class = CoverDeviceClass.BLIND
_attr_supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
_attr_is_closing = False
_attr_is_opening = False
_attr_assumed_state = True
CLOSE = b"0"
OPEN = b"1"
STOP = b"2"
def __init__(self, ip: str = "238.0.0.18", port: int = 32100):
# self.state = None
self._attr_name = "KN Cover"
self._attr_has_entity_name = True
self._attr_entity_registry_enabled_default = True
self._attr_assumed_state = True
self._attr_available = True
self._attr_is_closed = None
self._state = None
self._attr_unique_id = "xxxxxxxxxxxx" # <--- ADD MAC HERE
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.group = socket.inet_aton(ip)
self.mreq = struct.pack('4s4s', self.group, socket.inet_aton('0.0.0.0')) # 0.0.0.0 bedeutet "alle Schnittstellen"
self.accessToken = TOKEN
self.open_cover()
@property
def is_closed(self) -> None:
return self._state == CoverState.CLOSED
@property
def assumed_state(self) -> bool:
"""Return True if unable to access real state of the entity."""
return True
def __del__(self):
self.sock.close()
def send(self, message):
self.sock.sendto(message, (self.ip, self.port))
def open_cover(self, **kwargs):
"""Open the cover."""
self._state = CoverState.OPEN
self._attr_is_closed = False
message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.OPEN + b'}}'
self.send(message)
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._state = CoverState.OPEN # guesstimation
self._attr_is_closed = False
message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.STOP + b'}}'
self.send(message)
def close_cover(self, **kwargs):
"""Close cover."""
self._state = CoverState.CLOSED
self._attr_is_closed = True
message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.CLOSE + b'}}'
self.send(message)
def getDate(self):
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
second = now.second
millisecond = int(now.microsecond / 1000) # Convert microseconds to milliseconds
datetime_string = str.encode(f"{year}{month}{day}{hour}{minute}{second}{millisecond}")
return datetime_string
Hardcode roller blind details (MAC, IP, `AccessToken`) in the component.
Enable the platform in `configuration.yaml` by adding:
cover:
- platform: kn_cover
Restart Home Assistant to register the new entity.
hzgf. am 03. January 2025