Latest articles:

Integrating Kaiser Nienhaus Roller Blinds into Home Assistant

aus der Kategorie Hacking

Begin

The holiday season often brings the joy of family gatherings—and a recurring role as the family IT support. This year’s project was integrating a Kaiser Nienhaus Wi-Fi roller blind into Home Assistant. Since no official integration exists and documentation about the communication protocol is lacking, the task required reverse engineering, crafting a proof of concept, and building a custom Home Assistant component.

Using the Official Tools (KN Connect)

The first step was to use the official app. After installing the KN Connect app, registering an account, and adding the roller blind to the Wi-Fi network, basic operations (open/close) worked seamlessly. Testing showed that the roller blind could still be controlled locally after blocking internet access via a firewall. This confirmed the possibility of a local solution and set the stage for further exploration.

Reverse Engineering the KN Connect App

Attempts to gather information about the roller blind using `nmap` scans failed. However, the KN Connect app revealed two communication mechanisms: MQTT and a multicast-based protocol. The multicast service operates over UDP on ports `32100` and `32101`, broadcasting to the group `238.0.0.18`. By running a simple listener on these ports, messages between the app and the roller blind were intercepted.
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024
import socket

MCAST_GRP = "238.0.0.18"
MCAST_PORT = 32100

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", MCAST_PORT))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(MCAST_GRP) + socket.inet_aton("0.0.0.0"))

while True:
    data, addr = sock.recvfrom(1024)
    print(f"Message from {addr}: {data.decode()}")

Proof of Concept

With the protocol understood, a script was written to send commands directly to the roller blind using the multicast service. The `AccessToken` is critical for authenticating commands. Below is a Python script for closing the blind:
#!/usr/bin/env python
# Author: Friedrich Hust
# 27.12.2024

import socket
import struct
import datetime

# Multicast group and port
multicast_group = '238.0.0.18'
port = 32100

now = datetime.datetime.now()
datetime_string = str.encode(
    f"{now.year}{now.month}{now.day}{now.hour}{now.minute}{now.second}{int(now.microsecond / 1000)}"
)

CLOSE = b"0"

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4s4s', group, socket.inet_aton('0.0.0.0'))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

message = (
    b'{"msgType":"WriteDevice","mac":"XXXXXXXXXXXX","deviceType":"22000002",'
    b'"AccessToken":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX","msgID":"'
    + datetime_string
    + b'","data":{"operation":'
    + CLOSE
    + b'}}'
)

try:
    sock.sendto(message, (multicast_group, port))
finally:
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, mreq)
    sock.close()
This script successfully commands the roller blind to close. Operations for opening and stopping can be implemented by modifying the `operation` field.

Integrating the Proof of Concept into Home Assistant

To integrate the roller blind into Home Assistant, a custom component was created. The Docker container running Home Assistant couldn't use UDP multicast, so direct IP commands were used instead. Assigning a static IP to the roller blind in the DHCP server resolved this issue.

Steps to Create the Integration

  • Create a directory: custom_components/kn_cover.
  • Add the following files:
    • manifest.json
    • __init__.py
    • knconnect.py
    {
      "domain": "kn_cover",
      "name": "KN Cover",
      "codeowners": ["Friedrich Hust"],
      "dependencies": [],
      "documentation": "https://airmack.de",
      "iot_class": "local_polling",
      "requirements": [],
      "version": "0.9.0"
    }
    
    
    #!/usr/bin/env python
    # Author: Friedrich Hust
    # 27.12.2024
    from __future__ import annotations
    import logging
    import struct
    import datetime
    import socket
    from datetime import timedelta
    
    from homeassistant.components.sensor import (
        SensorDeviceClass,
        SensorEntity,
        SensorStateClass,
    )
    from homeassistant.core import HomeAssistant
    from homeassistant.helpers.entity_platform import AddEntitiesCallback
    from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
    from homeassistant.helpers.entity_component import EntityComponent
    from homeassistant.util.hass_dict import HassKey
    
    from homeassistant.components.cover import (
        ATTR_POSITION,
        ATTR_TILT_POSITION,
        DOMAIN as COVER_DOMAIN,
        CoverDeviceClass,
        CoverEntity,
        CoverState,
        CoverEntityFeature,
    )
    
    from homeassistant.const import (  # noqa: F401
        SERVICE_CLOSE_COVER,
        SERVICE_CLOSE_COVER_TILT,
        SERVICE_OPEN_COVER,
        SERVICE_OPEN_COVER_TILT,
        SERVICE_SET_COVER_POSITION,
        SERVICE_SET_COVER_TILT_POSITION,
        SERVICE_STOP_COVER,
        SERVICE_STOP_COVER_TILT,
        SERVICE_TOGGLE,
        SERVICE_TOGGLE_COVER_TILT,
        STATE_CLOSED,
        STATE_CLOSING,
        STATE_OPEN,
        STATE_OPENING,
    )
    DOMAIN = "cover"
    
    
    DATA_COMPONENT: HassKey[EntityComponent[CoverEntity]] = HassKey(DOMAIN)
    SCAN_INTERVAL = timedelta(seconds=15)
    
    _LOGGER = logging.getLogger(__name__)
    IP = "192.168.0.2"  # <--- ADD IP HERE
    TOKEN = b"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"  # <--- ADD TOKEN HERE
    
    
    def setup_platform(
        hass: HomeAssistant,
        config: ConfigType,
        add_entities: AddEntitiesCallback,
        discovery_info: DiscoveryInfoType | None = None
    ) -> None:
        """Set up the sensor platform."""
        add_entities([KNCover(IP, 32100)])
    
    
    async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
        """Track states and offer events for covers."""
        component = hass.data[DATA_COMPONENT] = EntityComponent[CoverEntity](
            _LOGGER, DOMAIN, hass, SCAN_INTERVAL
        )
    
        await component.async_setup(config)
    
    
    class KNCover(CoverEntity):
    
        _attr_device_class = CoverDeviceClass.BLIND
        _attr_supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
        _attr_is_closing = False
        _attr_is_opening = False
        _attr_assumed_state = True
    
        CLOSE = b"0"
        OPEN = b"1"
        STOP = b"2"
    
        def __init__(self, ip: str = "238.0.0.18", port: int = 32100):
            # self.state = None
            self._attr_name = "KN Cover"
            self._attr_has_entity_name = True
            self._attr_entity_registry_enabled_default = True
            self._attr_assumed_state = True
            self._attr_available = True
            self._attr_is_closed = None
    
            self._state = None
            self._attr_unique_id = "xxxxxxxxxxxx"  # <--- ADD MAC HERE
            self.ip = ip
            self.port = port
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.group = socket.inet_aton(ip)
            self.mreq = struct.pack('4s4s', self.group, socket.inet_aton('0.0.0.0'))  # 0.0.0.0 bedeutet "alle Schnittstellen"
            self.accessToken = TOKEN
            self.open_cover()
    
        @property
        def is_closed(self) -> None:
            return self._state == CoverState.CLOSED
    
        @property
        def assumed_state(self) -> bool:
            """Return True if unable to access real state of the entity."""
            return True
    
        def __del__(self):
            self.sock.close()
    
        def send(self, message):
            self.sock.sendto(message, (self.ip, self.port))
    
        def open_cover(self, **kwargs):
            """Open the cover."""
            self._state = CoverState.OPEN
            self._attr_is_closed = False
            message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.OPEN + b'}}'
    
            self.send(message)
    
        def stop_cover(self, **kwargs):
            """Stop the cover."""
            self._state = CoverState.OPEN  # guesstimation
            self._attr_is_closed = False
            message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.STOP + b'}}'
    
            self.send(message)
    
        def close_cover(self, **kwargs):
            """Close cover."""
            self._state = CoverState.CLOSED
            self._attr_is_closed = True
            message = b'{"msgType":"WriteDevice","mac":"' + self._attr_unique_id + b'","deviceType":"22000002","AccessToken":"' + self.accessToken + b'","msgID":"' + self.getDate() + b'","data":{"operation":' + KNCover.CLOSE + b'}}'
    
            self.send(message)
    
        def getDate(self):
            now = datetime.datetime.now()
            year = now.year
            month = now.month
            day = now.day
            hour = now.hour
            minute = now.minute
            second = now.second
            millisecond = int(now.microsecond / 1000)  # Convert microseconds to milliseconds
            datetime_string = str.encode(f"{year}{month}{day}{hour}{minute}{second}{millisecond}")
            return datetime_string
    
  • Hardcode roller blind details (MAC, IP, `AccessToken`) in the component.
  • Enable the platform in `configuration.yaml` by adding: cover: - platform: kn_cover
  • Restart Home Assistant to register the new entity.

hzgf. am 03. January 2025