Files
2025-09-11 10:47:34 +03:00

425 lines
18 KiB
Python

"""The Home Connect New integration."""
from __future__ import annotations
import asyncio
import copy
import logging
import aiohttp
from datetime import datetime
import voluptuous as vol
from home_connect_async import Appliance, HomeConnect, Events, ConditionalLogger
from homeassistant.components.application_credentials import ClientCredential, async_import_client_credential
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET, Platform
from homeassistant.core import Event, HomeAssistant, HomeAssistantError
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import aiohttp_client, config_entry_oauth2_flow
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers import storage
from homeassistant.helpers.typing import ConfigType
from . import api, config_flow
from .common import Configuration
from .const import *
from .services import Services
_LOGGER = logging.getLogger(__name__)
HC_CONFIG_SCHEMA = vol.Schema(
{
# vol.Optional(CONF_CLIENT_ID): cv.string,
# vol.Optional(CONF_CLIENT_SECRET): cv.string,
# vol.Optional(CONF_API_HOST, default=DEFAULT_API_HOST): str,
# vol.Optional(CONF_CACHE, default=False): vol.Coerce(bool),
# vol.Optional(CONF_LANG, default=CONF_LANG_DEFAULT): str,
# vol.Optional(CONF_TRANSLATION_MODE, default="local"): str,
# vol.Optional(CONF_SENSORS_TRANSLATION, default=None): vol.Any(str, None),
# vol.Optional(CONF_NAME_TEMPLATE, default=CONF_NAME_TEMPLATE_DEFAULT): str,
# vol.Optional(CONF_LOG_MODE, default=0): int,
# vol.Optional(CONF_SSE_TIMEOUT, default=CONF_SSE_TIMEOUT_DEFAULT): int,
vol.Optional(CONF_ENTITY_SETTINGS, default={}): vol.Any(dict, None),
vol.Optional(CONF_APPLIANCE_SETTINGS, default={}): vol.Any(dict, None)
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: HC_CONFIG_SCHEMA
},
extra=vol.ALLOW_EXTRA,
)
OPTIONS_SCHEMA = vol.Schema(
{
vol.Optional(CONF_LANG, default=CONF_LANG_DEFAULT): vol.Coerce(str),
vol.Optional(CONF_TRANSLATION_MODE, default="local"): vol.Coerce(str),
vol.Optional(CONF_DELAYED_OPS, default=CONF_DELAYED_OPS_DEFAULT): vol.Coerce(str),
vol.Optional(CONF_NAME_TEMPLATE, default=CONF_NAME_TEMPLATE_DEFAULT): vol.Coerce(str),
vol.Optional(CONF_LOG_MODE, default=0): vol.Coerce(int),
vol.Optional(CONF_SSE_TIMEOUT, default=CONF_SSE_TIMEOUT_DEFAULT): vol.Coerce(int),
vol.Optional(CONF_ENTITY_SETTINGS, default={}): vol.Any(dict, None),
vol.Optional(CONF_APPLIANCE_SETTINGS, default={}): vol.Any(dict, None)
},
extra=vol.REMOVE_EXTRA
)
PLATFORMS = [Platform.SENSOR, Platform.BINARY_SENSOR, Platform.SELECT, Platform.NUMBER, Platform.BUTTON, Platform.SWITCH, Platform.TIME]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Home Connect New component."""
if DOMAIN not in config:
# hass.data[DOMAIN] = HC_CONFIG_SCHEMA({})
hass.data[DOMAIN] = { "global": {} }
return True
# The config now contains configuration coming from the configuration.yaml file
Configuration.set_global_config(config[DOMAIN])
hass.data[DOMAIN] = { "global": {} }
# migrate OAuth credentials from configuration.yaml to credentials manager
# if (CONF_CLIENT_ID in config[DOMAIN] and CONF_CLIENT_SECRET in config[DOMAIN]):
# await async_import_client_credential(
# hass,
# DOMAIN,
# ClientCredential(
# config[DOMAIN][CONF_CLIENT_ID],
# config[DOMAIN][CONF_CLIENT_SECRET],
# ),
# )
return True
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Set up Home Connect Alt from a config entry."""
# all_entries = hass.config_entries.async_entries(DOMAIN)
# save the options to compare later to see if they have changed
hass.data[DOMAIN][f"{config_entry.entry_id}_options"] = copy.deepcopy(dict(config_entry.options))
options = OPTIONS_SCHEMA(copy.deepcopy(dict(config_entry.options)))
conf = Configuration(options)
hass.data[DOMAIN][config_entry.entry_id] = conf
if CONF_API_HOST in config_entry.data:
api_host = config_entry.data[CONF_API_HOST]
hass.data[DOMAIN]["global"][CONF_API_HOST] = api_host
else:
api_host = hass.data[DOMAIN]["global"].get(CONF_API_HOST, DEFAULT_API_HOST)
conf["primary_config_entry"] = get_primary_config_entry(hass) == config_entry.entry_id
_LOGGER.debug(f"Config entry {config_entry.entry_id} is {'primary' if conf['primary_config_entry'] else 'secondary'}")
_LOGGER.debug(f"OAuth2={config_entry.data.get('auth_implementation','')} api_host={config_entry.data.get(CONF_API_HOST,'')}")
_LOGGER.debug(f"options: {config_entry.options}")
implementation = await config_entry_oauth2_flow.async_get_config_entry_implementation(hass, config_entry)
session = config_entry_oauth2_flow.OAuth2Session(hass, config_entry, implementation)
try:
await session.async_ensure_token_valid()
except aiohttp.ClientResponseError as ex:
_LOGGER.debug("API error: %s (%s)", ex.code, ex.message)
if ex.code in (400, 401, 403):
raise ConfigEntryAuthFailed("Token not valid, trigger renewal") from ex
raise ConfigEntryNotReady from ex
lang = conf[CONF_LANG] # if conf[CONF_LANG] != "" else None
#use_cache = conf[CONF_CACHE]
logmode = conf[CONF_LOG_MODE] if conf[CONF_LOG_MODE] else ConditionalLogger.LogMode.REQUESTS
# If using an aiohttp-based API lib
auth = api.AsyncConfigEntryAuth(
aiohttp_client.async_get_clientsession(hass), session, api_host
)
# homeconnect:HomeConnect = None
# if use_cache:
# homeconnect = await async_load_from_cache(hass, auth, lang)
# if not homeconnect:
# # Create normally if failed to create from cache
# try:
# homeconnect = await HomeConnect.async_create(auth, delayed_load=True, lang=lang)
# _LOGGER.debug("The HomeConnect object was created from scratch (without cache)")
# except HomeConnectError as ex:
# _LOGGER.warning("Failed to create the HomeConnect object", exc_info=ex)
# return False
ConditionalLogger.mode(logmode)
disabled_appliances = [haid for haid in conf[CONF_APPLIANCE_SETTINGS] if "disabled" in conf[CONF_APPLIANCE_SETTINGS][haid] and conf[CONF_APPLIANCE_SETTINGS][haid]["disabled"] ] if conf[CONF_APPLIANCE_SETTINGS] else []
homeconnect = await HomeConnect.async_create(auth, delayed_load=True, lang=lang, disabled_appliances=disabled_appliances, sse_timeout=conf[CONF_SSE_TIMEOUT])
services = register_services(hass, homeconnect)
conf.update({ "homeconnect": homeconnect, "services": services, "auth": auth })
#region internal event handlers
# async def async_delayed_update_cache(delay:float = 0):
# asyncio.sleep(delay)
# await async_save_to_cache(hass, homeconnect)
async def on_config_entry_update(hass:HomeAssistant, new_entry:ConfigEntry):
if dict(new_entry.options) != hass.data[DOMAIN][f"{config_entry.entry_id}_options"]:
_LOGGER.debug("Config entry updated, reloading the integration: %s", str(new_entry.options))
await hass.config_entries.async_reload(new_entry.entry_id)
async def on_data_loaded(homeconnect:HomeConnect):
# Save the state of the HomeConnect object to cache
# if use_cache:
# await async_save_to_cache(hass, homeconnect)
# else:
# _LOGGER.debug("Not saving to cache, it is disabled")
homeconnect.register_callback(on_device_removed, Events.DEPAIRED)
#homeconnect.register_callback(on_device_added, [Events.PAIRED, Events.DATA_CHANGED] )
homeconnect.subscribe_for_updates()
async def on_data_load_error(homeconnect:HomeConnect, ex:Exception):
_LOGGER.error("Failed to load data for the HomeConnect object", exc_info=ex)
# async def on_device_added(appliance:Appliance, event:str):
# if use_cache:
# await async_save_to_cache(hass, homeconnect)
# else:
# _LOGGER.debug("Not saving to cache, it is disabled")
async def on_device_removed(appliance:Appliance):
devreg = dr.async_get(hass)
device = devreg.async_get_device({(DOMAIN, appliance.haId.lower().replace('-','_'))})
devreg.async_remove_device(device.id)
# We need to wait for the appliance to be removed from the HomeConnect data
# this is not 100% fail-safe but good enough for a cache
# asyncio.create_task(async_delayed_update_cache(30))
#endregion
# Add event listener to reload the integration when the config entry options change (because the user edited them in the UI)
# config_entry.async_on_unload(config_entry.add_update_listener(lambda hass, entry: hass.config_entries.async_reload(entry.entry_id)))
config_entry.async_on_unload(config_entry.add_update_listener(on_config_entry_update))
# Setup all the callback listeners before starting to load the data
#hass.config_entries.async_setup_platforms(entry, PLATFORMS)
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
register_events_publisher(hass, homeconnect)
# Continue loading the HomeConnect data model and set the callback to be notified when done
homeconnect.start_load_data_task(on_complete=on_data_loaded, on_error= on_data_load_error)
return True
def get_primary_config_entry(hass:HomeAssistant) -> str:
""" Return the ID of the "first" config_entry for the integration """
all_entries = hass.config_entries.async_entries(DOMAIN)
for entry in all_entries:
if CONF_API_HOST in entry.data:
return entry.entry_id
return all_entries[0].entry_id
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload a config entry."""
conf = hass.data[DOMAIN]
homeconnect:HomeConnect = conf[config_entry.entry_id]['homeconnect']
homeconnect.close()
# await async_save_to_cache(hass, None)
unload_ok = await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(config_entry.entry_id)
return unload_ok
async def async_load_from_cache(hass:HomeAssistant, auth:api.AsyncConfigEntryAuth, lang:str|None) -> HomeConnect | None:
""" Helper function to load cached Home Connect data for storage """
cache = storage.Store(hass, version=1, key=f"{DOMAIN}_cache", private=True)
try:
refresh = HomeConnect.RefreshMode.ALL
json_data = None
cached_data = await cache.async_load()
if cached_data:
json_data = cached_data.get('json_data')
last_update = datetime.fromisoformat(cached_data['last_update'])
delta = (datetime.now()-last_update).total_seconds()
if delta < 60:
refresh = HomeConnect.RefreshMode.NOTHING
elif delta < 3600*24*30:
refresh = HomeConnect.RefreshMode.DYNAMIC_ONLY
homeconnect:HomeConnect = await HomeConnect.async_create(auth, json_data=json_data, refresh=refresh, delayed_load=True, lang=lang)
_LOGGER.debug("Loaded HomeConnect from cache")
return homeconnect
except Exception as ex:
# If there is any exception when creating the object from cache then clear the cache and continue
await cache.async_remove()
_LOGGER.debug("Exception while loading HomeConnect from cache, clearing cache and continuing", exc_info=ex)
return None
async def async_save_to_cache(hass:HomeAssistant, homeconnect:HomeConnect, cache:storage.Store=None) -> None:
""" Helper function to save the Home Connect data to Home Assistant storage """
try:
if not cache:
cache = storage.Store(hass, version=1, key=f"{DOMAIN}_cache", private=True)
if homeconnect:
cached_data = {
'last_update': datetime.now().isoformat(),
'json_data': homeconnect.to_json()
}
await cache.async_save(cached_data)
_LOGGER.debug("Saved HomeConnect to cache")
else:
await cache.async_remove()
_LOGGER.debug("Cleared HomeConnect from cache")
except Exception as ex:
_LOGGER.debug("Exception when saving HomeConnect to cache", exc_info=ex)
def register_services(hass:HomeAssistant, homeconnect:HomeConnect) -> Services:
""" Register the services offered by this integration """
services = Services(hass, homeconnect)
select_program_schema = vol.Schema(
{
vol.Required('device_id'): cv.string,
vol.Required('program_key'): cv.string,
vol.Optional('validate', default=True): cv.boolean,
vol.Optional('options'): vol.Schema(
[
{
vol.Required('key'): cv.string,
vol.Required('value'): vol.Any(str, int, float, bool)
}
]
)
}
)
hass.services.async_register(DOMAIN, "select_program", services.async_select_program, schema=select_program_schema)
start_program_schema = vol.Schema(
{
vol.Required('device_id'): cv.string,
vol.Optional('program_key'): cv.string,
vol.Optional('validate', default=True): cv.boolean,
vol.Optional('options'): vol.Schema(
[
{
vol.Required('key'): cv.string,
vol.Required('value'): vol.Any(str, int, float, bool)
}
]
)
}
)
hass.services.async_register(DOMAIN, "start_program", services.async_start_program, schema=start_program_schema)
stop_program_schema = vol.Schema(
{
vol.Required('device_id'): cv.string
}
)
hass.services.async_register(DOMAIN, "stop_program", services.async_stop_program, schema=stop_program_schema)
pause_program_schema = vol.Schema(
{
vol.Required('device_id'): cv.string
}
)
hass.services.async_register(DOMAIN, "pause_program", services.async_pause_program, schema=pause_program_schema)
resume_program_schema = vol.Schema(
{
vol.Required('device_id'): cv.string
}
)
hass.services.async_register(DOMAIN, "resume_program", services.async_resume_program, schema=resume_program_schema)
set_program_option_schema = vol.Schema(
{
vol.Required('device_id'): cv.string,
vol.Required('key'): cv.string,
vol.Required('value'): vol.Any(str, int, float, bool)
}
)
hass.services.async_register(DOMAIN, "set_program_option", services.async_set_program_option, schema=set_program_option_schema)
apply_setting_schema = vol.Schema(
{
vol.Required('device_id'): cv.string,
vol.Required('key'): cv.string,
vol.Required('value'): vol.Any(str, int, float, bool)
}
)
hass.services.async_register(DOMAIN, "apply_setting", services.async_apply_setting, schema=apply_setting_schema)
run_command_schema = vol.Schema(
{
vol.Required('device_id'): cv.string,
vol.Required('key'): cv.string,
vol.Required('value'): vol.Any(str, int, float, bool)
}
)
hass.services.async_register(DOMAIN, "run_command", services.async_run_command, schema=run_command_schema)
return services
def register_events_publisher(hass:HomeAssistant, homeconnect:HomeConnect):
""" Register for publishing events that are offered by this integration """
device_reg = dr.async_get(hass)
last_event = { 'key': None, 'value': None} # Used to filter out duplicate events
async def async_handle_event(appliance:Appliance, key:str, value:str):
if key != last_event['key'] or value != last_event['value']:
last_event['key'] = key
last_event['value'] = value
device = device_reg.async_get_device({(DOMAIN, appliance.haId.lower().replace('-','_'))})
event_data = {
"device_id": device.id,
"key": key,
"value": value
}
if key in ("BSH.Common.Status.OperationState", "BSH.Common.Event.ProgramFinished"):
# delay the firing of the event to deal with event handling race condition
_LOGGER.debug("Creating delayed_publish_event task")
asyncio.create_task(delayed_publish_event(event_data, 1))
else:
hass.bus.async_fire(f"{DOMAIN}_event", event_data)
_LOGGER.debug("Published event to Home Assistant event bus: %s = %s", key, str(value))
else:
_LOGGER.debug("Skipped publishing of duplicate event to Home Assistant event bus: %s = %s", key, str(value))
async def delayed_publish_event(event_data, timeout):
_LOGGER.debug(f"Sleeping for {timeout} seconds to delay event publishing of {event_data}")
await asyncio.sleep(timeout)
hass.bus.async_fire(f"{DOMAIN}_event", event_data)
_LOGGER.debug("Published event to Home Assistant event bus after delay: %s = %s", event_data["key"], str(event_data["value"]))
def register_appliance(appliance:Appliance):
for event in PUBLISHED_EVENTS:
appliance.register_callback(async_handle_event, event)
homeconnect.register_callback(register_appliance, [Events.PAIRED, Events.CONNECTED])
for appliance in homeconnect.appliances.values():
register_appliance(appliance)
class HomeConnectOauth2Impl(config_entry_oauth2_flow.LocalOAuth2Implementation):
"""" Implement that OAuth2 class """
@property
def name(self) -> str:
"""Name of the implementation."""
return "Home Connect Appliances"