"""Meross devices platform loader""" import asyncio import logging from datetime import datetime, timedelta from typing import List, Tuple, Dict, Optional, Collection import homeassistant.helpers.config_validation as cv import voluptuous as vol from homeassistant import config_entries from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.exceptions import ConfigEntryNotReady, ConfigEntryAuthFailed from homeassistant.helpers.entity import Entity from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from meross_iot.controller.device import BaseDevice from meross_iot.http_api import MerossHttpClient, ErrorCodes from meross_iot.manager import MerossManager from meross_iot.model.credentials import MerossCloudCreds from meross_iot.model.enums import OnlineStatus, Namespace from meross_iot.model.exception import CommandTimeoutError from meross_iot.model.http.device import HttpDeviceInfo from meross_iot.model.http.exception import ( TokenExpiredException, TooManyTokensException, UnauthorizedException, HttpApiError, BadLoginException, ) from .common import ( ATTR_CONFIG, CLOUD_HANDLER, DOMAIN, HA_CLIMATE, HA_COVER, HA_FAN, HA_LIGHT, HA_SENSOR, HA_SWITCH, MANAGER, MEROSS_PLATFORMS, SENSORS, dismiss_notification, notify_error, log_exception, CONF_STORED_CREDS, LIMITER, CONF_HTTP_ENDPOINT, CONF_MQTT_SKIP_CERT_VALIDATION, HTTP_API_RE, HTTP_UPDATE_INTERVAL, DEVICE_LIST_COORDINATOR, calculate_id, DEFAULT_USER_AGENT, CONF_OPT_CUSTOM_USER_AGENT, CONF_OVERRIDE_MQTT_ENDPOINT, CONF_OPT_LAN, CONF_OPT_LAN_MQTT_ONLY, TRANSPORT_MODES_TO_ENUM, MEROSS_DEFAULT_CLOUD_API_URL ) from .version import MEROSS_IOT_VERSION _LOGGER = logging.getLogger(__name__) CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.Schema( { vol.Required(CONF_HTTP_ENDPOINT): cv.string, vol.Required(CONF_MQTT_SKIP_CERT_VALIDATION): cv.boolean, vol.Optional(CONF_STORED_CREDS): cv.string, vol.Optional(CONF_OVERRIDE_MQTT_ENDPOINT): cv.string } ) }, extra=vol.ALLOW_EXTRA, ) def print_startup_message(http_devices: List[HttpDeviceInfo]): http_info = "\n".join( [f"- {x.dev_name} ({x.device_type}) - {x.online_status}" for x in http_devices] ) start_message = ( f"\n" f"===============================\n" f"Meross Cloud Custom component\n" f"Developed by Alberto Geniola\n" f"Low level library version: {MEROSS_IOT_VERSION}\n" f"-------------------------------\n" f"This custom component is under development and not yet ready for production use.\n" f"In case of errors/misbehave, please report it here: \n" f"https://github.com/albertogeniola/meross-homeassistant/issues\n" f"\n" f"If you like this extension and you want to support it, please consider donating.\n" f"-------------------------------\n" f"List of devices reported by HTTP API:\n" f"{http_info}" f"\n===============================" ) _LOGGER.warning(start_message) class MerossCoordinator(DataUpdateCoordinator): def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry, http_api_endpoint: str, creds: MerossCloudCreds, mqtt_skip_cert_validation: bool, mqtt_override_address: Optional[Tuple[str, int]], update_interval: timedelta, ua_header: str): self._entry = config_entry self._http_api_endpoint = http_api_endpoint self._cached_creds = creds self._skip_cert_validation = mqtt_skip_cert_validation self._mqtt_override_address = mqtt_override_address self._setup_done = False self._ua_header = ua_header # Objects not to be initialized here self._client = None self._manager = None super().__init__(hass=hass, logger=_LOGGER, name="meross_http_coordinator", update_interval=update_interval, update_method=self._async_fetch_http_data) async def _async_fetch_http_data(self): try: async with asyncio.timeout(10): # Fetch devices and compose a quick-access dictionary devices = await self._client.async_list_devices() return {device.uuid: device for device in devices} except (BadLoginException, TokenExpiredException, UnauthorizedException) as err: # Raising ConfigEntryAuthFailed will cancel future updates # and start a config flow with SOURCE_REAUTH (async_step_reauth) raise ConfigEntryAuthFailed from err except HttpApiError as err: raise UpdateFailed(f"Error communicating with API: {err}") async def initial_setup(self): if self._setup_done: raise ValueError("This coordinator was already set up") # Test the stored credentials if any. In case the credentials are invalid # try to retrieve a new token try: self._client, http_devices, creds_renewed = await get_or_test_creds( http_api_url=self._http_api_endpoint, creds=self._cached_creds, ua_header=self._ua_header ) except (BadLoginException, TokenExpiredException, UnauthorizedException) as err: raise ConfigEntryAuthFailed from err except HttpApiError as err: raise ConfigEntryNotReady(f"Error communicating with API: {err}") from err # If a new token was issued, store it into the current entry if creds_renewed: # Override the new credentials and store them into HA entry self._cached_creds = self._client.cloud_credentials self.hass.config_entries.async_update_entry( entry=self._entry, data={ CONF_HTTP_ENDPOINT: self._cached_creds.domain, CONF_STORED_CREDS: { "token": self._cached_creds.token, "key": self._cached_creds.key, "user_id": self._cached_creds.user_id, "user_email": self._cached_creds.user_email, "issued_on": self._cached_creds.issued_on.isoformat(), "domain": self._cached_creds.domain, "mqtt_domain": self._cached_creds.mqtt_domain }, }, ) # Now that we are logged in at HTTP api level, instantiate the manager. self._manager = MerossManager( http_client=self._client, mqtt_override_server=self._mqtt_override_address, auto_reconnect=True, mqtt_skip_cert_validation=self._skip_cert_validation, ) # Since we already have fetched for the DeviceList, publish it right away self.async_set_updated_data({device.uuid: device for device in http_devices}) # Print startup message, start the manager and issue a first discovery print_startup_message(http_devices=self.data.values()) _LOGGER.info("Starting meross manager") await self._manager.async_init() _LOGGER.info("Discovering Meross devices...") await self._manager.async_device_discovery() # If no exception is thrown so far, it means setup was successful self._setup_done = True @property def manager(self) -> MerossManager: return self._manager @property def client(self) -> MerossHttpClient: return self._client class MerossDevice(Entity): def __init__(self, device: BaseDevice, channel: int, device_list_coordinator: DataUpdateCoordinator[Dict[str, HttpDeviceInfo]], platform: str, supplementary_classifiers: Optional[List[str]] = None, override_channel_name: str = None): self._coordinator = device_list_coordinator self._device = device self._channel_id = channel self._last_http_state = None self._cb_async_remove_listener = None base_name = f"{device.name} ({device.type})" if supplementary_classifiers is not None: self._id = calculate_id(platform=platform, uuid=device.internal_id, channel=channel, supplementary_classifiers=supplementary_classifiers) base_name += f" " + " ".join(supplementary_classifiers) else: self._id = calculate_id(platform=platform, uuid=device.internal_id, channel=channel) if override_channel_name: channel_name = override_channel_name elif device.channels is not None and len(device.channels) > 0: channel_data = device.channels[channel] channel_name = channel_data.name else: channel_name = None self._entity_name = f"{base_name} - {channel_name}" if channel_name is not None else base_name @property def should_poll(self) -> bool: return False async def async_update(self): if self.online: try: await self._device.async_update() except CommandTimeoutError as e: log_exception(logger=_LOGGER, device=self._device) def _http_data_changed(self) -> None: new_data = self._coordinator.data.get(self._device.uuid) if self._last_http_state is not None and self._last_http_state.online_status != OnlineStatus.ONLINE and new_data.online_status == OnlineStatus.ONLINE: self._last_http_state = new_data self.async_schedule_update_ha_state(force_refresh=True) else: self._last_http_state = new_data self.async_schedule_update_ha_state(force_refresh=False) @property def online(self) -> bool: if not self._coordinator.last_update_success: return False elif self._last_http_state is not None: return self._last_http_state.online_status == OnlineStatus.ONLINE else: return self._coordinator.data.get(self._device.uuid).online_status == OnlineStatus.ONLINE @property def unique_id(self) -> str: return self._id @property def name(self) -> str: return self._entity_name @property def device_info(self): return { 'identifiers': {(DOMAIN, self._device.internal_id)}, 'name': self._device.name, 'manufacturer': 'Meross', 'model': self._device.type + " " + self._device.hardware_version, 'sw_version': self._device.firmware_version } @property def available(self) -> bool: return self._coordinator.last_update_success and self.online async def _async_push_notification_received(self, namespace: Namespace, data: dict, device_internal_id: str): update_state = False full_update = False if namespace == Namespace.CONTROL_UNBIND: _LOGGER.warning(f"Received unbind event. Removing device %s from HA", self.name) await self.platform.async_remove_entity(self.entity_id) elif namespace == Namespace.SYSTEM_ONLINE: _LOGGER.info(f"Device %s reported online event.", self.name) online = OnlineStatus(int(data.get('online').get('status'))) update_state = True full_update = online == OnlineStatus.ONLINE elif namespace == Namespace.HUB_ONLINE: _LOGGER.info(f"Device {self.name} reported (HUB) online event.") online = OnlineStatus(int(data.get('status'))) update_state = True full_update = online == OnlineStatus.ONLINE else: update_state = True full_update = False # In all other cases, just tell HA to update the internal state representation if update_state: self.async_schedule_update_ha_state(force_refresh=full_update) async def async_added_to_hass(self) -> None: self._device.register_push_notification_handler_coroutine(self._async_push_notification_received) self._cb_async_remove_listener = self._coordinator.async_add_listener(self._http_data_changed) self.hass.data[DOMAIN]["ADDED_ENTITIES_IDS"].add(self.unique_id) async def async_will_remove_from_hass(self) -> None: self._device.unregister_push_notification_handler_coroutine(self._async_push_notification_received) if self._cb_async_remove_listener is not None: self._cb_async_remove_listener() self.hass.data[DOMAIN]["ADDED_ENTITIES_IDS"].remove(self.unique_id) async def get_or_test_creds( creds: MerossCloudCreds = None, http_api_url: str = MEROSS_DEFAULT_CLOUD_API_URL, ua_header: str = DEFAULT_USER_AGENT ) -> Tuple[MerossHttpClient, List[HttpDeviceInfo], bool]: renewed = False http_client = MerossHttpClient( cloud_credentials=creds, ua_header=ua_header ) # The local addon api might not be able to provide the correct domain and mqtt values. In this case, # we patch them here. if http_api_url is not None and (http_client.cloud_credentials.domain is None or creds.domain.lower().strip() == MEROSS_DEFAULT_CLOUD_API_URL): _LOGGER.warning("Returned/Stored DOMAIN within existing credentials is <%s>. Patching the stored credentials with the correct value right away.", http_client.cloud_credentials.domain) http_client.cloud_credentials.domain = http_api_url renewed = True # Test device listing. If goes ok, return it immediately. This will make API fail and ask re-login http_devices = await http_client.async_list_devices() return http_client, http_devices, renewed def _http_info_changed(known: Collection[HttpDeviceInfo], discovered: Collection[HttpDeviceInfo]) -> bool: """Tells when a new device is discovered among the known ones""" known_ids = [dev.uuid for dev in known] unknown = [dev for dev in discovered if dev.uuid not in known_ids] return len(unknown) > 0 async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry): """ This class is called by the HomeAssistant framework when a configuration entry is provided. For us, the configuration entry is the username-password credentials that the user needs to access the Meross cloud. """ # Retrieve the stored credentials from config-flow http_api_endpoint = config_entry.data.get(CONF_HTTP_ENDPOINT) _LOGGER.info("Loaded %s: %s", CONF_HTTP_ENDPOINT, http_api_endpoint) str_creds = config_entry.data.get(CONF_STORED_CREDS) _LOGGER.info("Loaded %s: %s", CONF_STORED_CREDS, "******") mqtt_skip_cert_validation = config_entry.data.get(CONF_MQTT_SKIP_CERT_VALIDATION, True) _LOGGER.info("Skip MQTT cert validation option set to: %s", mqtt_skip_cert_validation) mqtt_override_address = config_entry.data.get(CONF_OVERRIDE_MQTT_ENDPOINT) _LOGGER.info("Override MQTT address set to: %s", "no" if mqtt_override_address is None else "yes -> %s" % mqtt_override_address) # Make sure we have all the needed requirements if http_api_endpoint is None or HTTP_API_RE.fullmatch(http_api_endpoint) is None: raise ConfigEntryAuthFailed("Missing or wrong HTTP_API_ENDPOINT") if str_creds is None: raise ConfigEntryAuthFailed("Missing credentials. Please re-authenticate.") if mqtt_override_address is not None: mqtt_host = mqtt_override_address.split(":")[0] mqtt_port = int(mqtt_override_address.split(":")[1]) mqtt_override_address = (mqtt_host, mqtt_port) issued_on = datetime.fromisoformat(str_creds.get("issued_on")) creds = MerossCloudCreds( domain=str_creds.get("domain", MEROSS_DEFAULT_CLOUD_API_URL), mqtt_domain=str_creds.get("mqtt_domain"), token=str_creds.get("token"), key=str_creds.get("key"), user_id=str_creds.get("user_id"), user_email=str_creds.get("user_email"), issued_on=issued_on ) # Initialize the HASS structure hass.data[DOMAIN] = {} hass.data[DOMAIN]["ADDED_ENTITIES_IDS"] = set() # Retrieve options we need ua_header = config_entry.options.get(CONF_OPT_CUSTOM_USER_AGENT, DEFAULT_USER_AGENT) if ua_header == "" or not isinstance(ua_header, str): _LOGGER.warning("Invalid user-agent option specified in config <%s>; defaulting to <%s>", str(ua_header), str(DEFAULT_USER_AGENT)) ua_header = DEFAULT_USER_AGENT try: # Setup the coordinator meross_coordinator = MerossCoordinator( hass=hass, config_entry=config_entry, http_api_endpoint=http_api_endpoint, creds=creds, mqtt_skip_cert_validation=mqtt_skip_cert_validation, mqtt_override_address=mqtt_override_address, update_interval=timedelta(seconds=HTTP_UPDATE_INTERVAL), ua_header=ua_header ) # Initiate the coordinator. This method will also make sure to login to the API, # instantiates the manager, starts it and issues a first discovery. await meross_coordinator.initial_setup() manager = meross_coordinator.manager hass.data[DOMAIN][MANAGER] = manager hass.data[DOMAIN][DEVICE_LIST_COORDINATOR] = meross_coordinator # Once the manager is ok and the first discovery was issued, we can proceed with platforms setup. await hass.config_entries.async_forward_entry_setups(config_entry, MEROSS_PLATFORMS) def _http_api_polled(*args, **kwargs): # Whenever a new HTTP device is seen, we issue a discovery discovered_devices = meross_coordinator.data known_devices = manager.find_devices(device_uuids=discovered_devices.keys()) if _http_info_changed(known_devices, discovered_devices.values()): _LOGGER.info("The HTTP API has found new devices that were unknown to us. Triggering discovery.") hass.create_task(manager.async_device_discovery(update_subdevice_status=True, cached_http_device_list=discovered_devices.values())) # Register a handler for HTTP events so that we can check for new devices and trigger # a discovery when needed config_entry.async_on_unload(meross_coordinator.async_add_listener(_http_api_polled)) config_entry.async_on_unload(config_entry.add_update_listener(update_listener)) return True except TooManyTokensException: msg = ( "Too many tokens have been issued to this account. " "The Remote API refused to issue a new one." ) notify_error(hass, "http_connection", "Meross Cloud", msg) log_exception(msg, logger=_LOGGER) raise ConfigEntryAuthFailed("Too many tokens have been issued") except (UnauthorizedException, HttpApiError) as ex: # Do not retry setup: user must update its credentials if ex is UnauthorizedException or ex.error_code in ( ErrorCodes.CODE_TOKEN_INVALID, ErrorCodes.CODE_TOKEN_EXPIRED, ErrorCodes.CODE_TOKEN_ERROR, ): raise ConfigEntryAuthFailed("Invalid token or credentials") else: msg = "Your Meross login credentials are invalid or the network could not be reached at the moment." notify_error( hass, "http_connection", "Meross Cloud", "Could not connect to the Meross cloud. Please check" " your internet connection and your Meross credentials", ) log_exception(msg, logger=_LOGGER) raise ConfigEntryNotReady() async def update_listener(hass, entry): """Handle options update.""" # Update options custom_ua = entry.options.get(CONF_OPT_CUSTOM_USER_AGENT, DEFAULT_USER_AGENT) transport_mode = entry.options.get(CONF_OPT_LAN, CONF_OPT_LAN_MQTT_ONLY) manager_transport_mode = TRANSPORT_MODES_TO_ENUM[transport_mode] manager: MerossManager = hass.data[DOMAIN][MANAGER] manager.default_transport_mode = manager_transport_mode # So far, the underlying Meross Library requires some "monkey patching" to set the # http user agent to be used. It's not nice, but until a public setter gets exposed, we need # to do so. manager._http_client._ua_header = custom_ua async def async_unload_entry(hass, entry): """Unload a config entry.""" # Unload entities first _LOGGER.info("Removing Meross Cloud integration.") _LOGGER.info("Cleaning up resources...") for platform in MEROSS_PLATFORMS: _LOGGER.info(f"Cleaning up platform {platform}") await hass.config_entries.async_forward_entry_unload(entry, platform) _LOGGER.info("Stopping manager...") manager = hass.data[DOMAIN][MANAGER] # TODO: Invalidate the token? manager.close() _LOGGER.info("Cleaning up memory...") for plat in MEROSS_PLATFORMS: if plat in hass.data[DOMAIN]: hass.data[DOMAIN][plat].clear() del hass.data[DOMAIN][plat] del hass.data[DOMAIN][MANAGER] hass.data[DOMAIN].clear() del hass.data[DOMAIN] _LOGGER.info("Meross cloud component removal done.") return True async def async_remove_entry(hass, entry) -> None: # TODO pass async def async_setup(hass, config): """ This method gets called if HomeAssistant has a valid meross_cloud: configuration entry within configurations.yaml. Thus, in this method we simply trigger the creation of a config entry. :return: """ conf = config.get(DOMAIN) hass.data[DOMAIN] = {} hass.data[DOMAIN][ATTR_CONFIG] = conf if conf is not None: hass.async_create_task( hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=conf ) ) return True