Begin
The holiday season often brings the joy of family gatherings' and a recurring role as the family IT support. This year's project was integrating a Kaiser Nienhaus Wi-Fi roller blind into Home Assistant. Since no official integration exists and documentation about the communication protocol is lacking, the task required reverse engineering, crafting a proof of concept, and building a custom Home Assistant component.
Using the Official Tools (KN Connect)
The first step was to use the official app. After installing the KN Connect app, registering an account, and adding the roller blind to the Wi-Fi network, basic operations (open/close) worked seamlessly. Testing showed that the roller blind could still be controlled locally after blocking internet access via a firewall. This confirmed the possibility of a local solution and set the stage for further exploration.
Reverse Engineering the KN Connect App
Attempts to gather information about the roller blind using 'nmap' scans failed. However, the KN Connect app revealed two communication mechanisms: MQTT and a multicast-based protocol. The multicast service operates over UDP on ports '32100' and '32101', broadcasting to the group '238.0.0.18'. By running a simple listener on these ports, messages between the app and the roller blind were intercepted.
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
import socket
MCAST_GRP = "238.0.0.18"
MCAST_PORT = 32100
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", MCAST_PORT))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(MCAST_GRP) + socket.inet_aton("0.0.0.0"))
while True:
    data, addr = sock.recvfrom(1024)
    print(f"Message from {addr}: {data.decode()}")
Proof of Concept
With the protocol understood, a script was written to send commands directly to the roller blind using the multicast service. The 'AccessToken' is critical for authenticating commands. Below is a Python script for closing the blind:
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
import socket
import struct
import datetime
# Multicast group and port
multicast_group = '238.0.0.18'
port = 32100
now = datetime.datetime.now()
datetime_string = str.encode(
    f"{now.year}{now.month}{now.day}{now.hour}{now.minute}{now.second}{int(now.microsecond / 1000)}"
)
CLOSE = b"0"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4s4s', group, socket.inet_aton('0.0.0.0'))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
message = (
    b'{"msgType":"WriteDevice","mac":"XXXXXXXXXXXX","deviceType":"22000002",'
    b'"AccessToken":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX","msgID":"'
    + datetime_string
    + b'","data":{"operation":'
    + CLOSE
    + b'}}'
)
try:
    sock.sendto(message, (multicast_group, port))
finally:
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq)
    sock.close()
This script successfully commands the roller blind to close. Operations for opening and stopping can be implemented by modifying the 'operation' field.
Integrating the Proof of Concept into Home Assistant
To integrate the roller blind into Home Assistant, a custom component was created. The Docker container running Home Assistant couldn't use UDP multicast, so direct IP commands were used instead. Assigning a static IP to the roller blind in the DHCP server resolved this issue.
Steps to Create the Integration
- Create a directory: custom_components/kn_cover.
 - Add the following files:
 - manifest.json
 
{
  "domain": "kn_cover",
  "name": "KN Cover",
  "codeowners": ["Friedrich Hust"],
  "dependencies": [],
  "documentation": "https://airmack.de",
  "iot_class": "local_polling",
  "requirements": [],
  "version": "0.9.0"
}
- init.py
 - knconnect.py
 
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
from __future__ import annotations
import logging
import struct
import datetime
import socket
from datetime import timedelta
from homeassistant.components.sensor import (
    SensorDeviceClass,
    SensorEntity,
    SensorStateClass,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.util.hass_dict import HassKey
from homeassistant.components.cover import (
    ATTR_POSITION,
    ATTR_TILT_POSITION,
    DOMAIN as COVER_DOMAIN,
    CoverDeviceClass,
    CoverEntity,
    CoverState,
    CoverEntityFeature,
)
from homeassistant.const import (  # noqa: F401
    SERVICE_CLOSE_COVER,
    SERVICE_CLOSE_COVER_TILT,
    SERVICE_OPEN_COVER,
    SERVICE_OPEN_COVER_TILT,
    SERVICE_SET_COVER_POSITION,
    SERVICE_SET_COVER_TILT_POSITION,
    SERVICE_STOP_COVER,
    SERVICE_STOP_COVER_TILT,
    SERVICE_TOGGLE,
    SERVICE_TOGGLE_COVER_TILT,
    STATE_CLOSED,
    STATE_CLOSING,
    STATE_OPEN,
    STATE_OPENING,
)
DOMAIN = "cover"
DATA_COMPONENT: HassKey[EntityComponent[CoverEntity]] = HassKey(DOMAIN)
SCAN_INTERVAL = timedelta(seconds=15)
_LOGGER = logging.getLogger(__name__)
IP = "192.168.0.2"  # <--- ADD IP HERE
TOKEN = b"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"  # <--- ADD TOKEN HERE
def setup_platform(
    hass: HomeAssistant,
    config: ConfigType,
    add_entities: AddEntitiesCallback,
    discovery_info: DiscoveryInfoType | None = None
) -> None:
    """Set up the sensor platform."""
    add_entities([KNCover(IP, 32100)])
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
    """Track states and offer events for covers."""
    component = hass.data[DATA_COMPONENT] = EntityComponent[CoverEntity](
        _LOGGER, DOMAIN, hass, SCAN_INTERVAL
    )
    await component.async_setup(config)
class KNCover(CoverEntity):
    _attr_device_class = CoverDeviceClass.BLIND
    _attr_supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
    _attr_is_closing = False
    _attr_is_opening = False
    _attr_assumed_state = True
    CLOSE = b"0"
    OPEN = b"1"
    STOP = b"2"
    def __init__(self, ip: str = "238.0.0.18", port: int = 32100):
        # self.state = None
        self._attr_name = "KN Cover"
        self._attr_has_entity_name = True
        self._attr_entity_registry_enabled_default = True
        self._attr_assumed_state = True
        self._attr_available = True
        self._attr_is_closed = None
        self._state = None
        self._attr_unique_id = "xxxxxxxxxxxx"  # <--- ADD MAC HERE
        self.ip = ip
        self.port = port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.group = socket.inet_aton(ip)
        self.mreq = struct.pack('4s4s', self.group, socket.inet_aton('0.0.0.0'))  # 0.0.0.0 bedeutet "alle Schnittstellen"
        self.accessToken = TOKEN
        self.open_cover()
    @property
    def is_closed(self) -> None:
        return self._state == CoverState.CLOSED
    @property
    def assumed_state(self) -> bool:
        """Return True if unable to access real state of the entity."""
        return True
    def __del__(self):
        self.sock.close()
    def send(self, message):
        self.sock.sendto(message, (self.ip, self.port))
    def open_cover(self, **kwargs):
        """Open the cover."""
        self._state = CoverState.OPEN
        self._attr_is_closed = False
        message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.OPEN + b'}}'
        self.send(message)
    def stop_cover(self, **kwargs):
        """Stop the cover."""
        self._state = CoverState.OPEN  # guesstimation
        self._attr_is_closed = False
        message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.STOP + b'}}'
        self.send(message)
    def close_cover(self, **kwargs):
        """Close cover."""
        self._state = CoverState.CLOSED
        self._attr_is_closed = True
        message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.CLOSE + b'}}'
        self.send(message)
    def getDate(self):
        now = datetime.datetime.now()
        year = now.year
        month = now.month
        day = now.day
        hour = now.hour
        minute = now.minute
        second = now.second
        millisecond = int(now.microsecond / 1000)  # Convert microseconds to milliseconds
        datetime_string = str.encode(f"{year}{month}{day}{hour}{minute}{second}{millisecond}")
        return datetime_string
- Hardcode roller blind details (MAC, IP, 'AccessToken') in the component.
 - Enable the platform in 'configuration.yaml' by adding:
 
cover:
   - platform: kn_cover
- Restart Home Assistant to register the new entity.