Initial Home Assistant Configuration

This commit is contained in:
root
2025-09-11 10:47:34 +03:00
commit ac8b542e1b
2360 changed files with 41412 additions and 0 deletions
@@ -0,0 +1,880 @@
"""Main package for planner."""
from __future__ import annotations
import datetime as dt
import logging
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
ATTR_UNIT_OF_MEASUREMENT,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
Platform,
)
from homeassistant.core import HomeAssistant, HomeAssistantError
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import (
async_track_state_change_event,
async_track_time_change,
)
from homeassistant.util import dt as dt_util
from .config_flow import NordpoolPlannerConfigFlow
from .const import (
CONF_ACCEPT_COST_ENTITY,
CONF_ACCEPT_RATE_ENTITY,
CONF_DURATION_ENTITY,
CONF_END_TIME_ENTITY,
CONF_HEALTH_ENTITY,
CONF_PRICES_ENTITY,
CONF_SEARCH_LENGTH_ENTITY,
CONF_START_TIME_ENTITY,
CONF_TYPE,
CONF_TYPE_MOVING,
CONF_TYPE_STATIC,
CONF_USED_HOURS_LOW_ENTITY,
DOMAIN,
NAME_FILE_READER,
PATH_FILE_READER,
PlannerStates,
)
from .helpers import get_np_from_file
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.BINARY_SENSOR, Platform.BUTTON, Platform.NUMBER, Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Set up this integration using UI."""
config_entry.async_on_unload(config_entry.add_update_listener(async_reload_entry))
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
if config_entry.entry_id not in hass.data[DOMAIN]:
planner = NordpoolPlanner(hass, config_entry)
await planner.async_setup()
hass.data[DOMAIN][config_entry.entry_id] = planner
if config_entry is not None:
if config_entry.source == SOURCE_IMPORT:
hass.async_create_task(
hass.config_entries.async_remove(config_entry.entry_id)
)
return False
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unloading a config_flow entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
planner = hass.data[DOMAIN].pop(entry.entry_id)
planner.cleanup()
return unload_ok
async def async_reload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Reload the config entry."""
await async_unload_entry(hass, config_entry)
await async_setup_entry(hass, config_entry)
async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Migrate old entry."""
_LOGGER.debug(
"Attempting migrating configuration from version %s.%s",
config_entry.version,
config_entry.minor_version,
)
class MigrateError(HomeAssistantError):
"""Error to indicate there is was an error in version migration."""
installed_version = NordpoolPlannerConfigFlow.VERSION
installed_minor_version = NordpoolPlannerConfigFlow.MINOR_VERSION
new_data = {**config_entry.data}
new_options = {**config_entry.options}
if config_entry.version > installed_version:
_LOGGER.warning(
"Downgrading major version from %s to %s is not allowed",
config_entry.version,
installed_version,
)
return False
if (
config_entry.version == installed_version
and config_entry.minor_version > installed_minor_version
):
_LOGGER.warning(
"Downgrading minor version from %s.%s to %s.%s is not allowed",
config_entry.version,
config_entry.minor_version,
installed_version,
installed_minor_version,
)
return False
def options_1x_to_20(options: dict, data: dict, hass: HomeAssistant):
try:
np_entity = hass.states.get(data[CONF_PRICES_ENTITY])
uom = np_entity.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
options.pop("currency")
options[ATTR_UNIT_OF_MEASUREMENT] = uom
except (IndexError, KeyError) as err:
_LOGGER.warning("Could not extract currency from Prices entity")
raise MigrateError from err
return options
def data_20_to_21(data: dict):
if entity_id := data.pop("np_entity"):
data[CONF_PRICES_ENTITY] = entity_id
return data
_LOGGER.warning('Could not find "np_entity" in config_entry')
raise MigrateError('Could not find "np_entity" in config_entry')
def data_21_to_22(data: dict):
if data[CONF_TYPE] == CONF_TYPE_STATIC:
data[CONF_USED_HOURS_LOW_ENTITY] = True
data[CONF_START_TIME_ENTITY] = True
if CONF_HEALTH_ENTITY not in data:
data[CONF_HEALTH_ENTITY] = True
return data
if config_entry.version == 1:
try:
# Version 1.x to 2.0
new_options = options_1x_to_20(new_options, new_data, hass)
# Version 2.0 to 2.1
new_data = data_20_to_21(new_data)
# Version 2.1 to 2.2
new_data = data_21_to_22(new_data)
except MigrateError:
_LOGGER.warning("Error while upgrading from version 1.x to 2.1")
return False
if config_entry.version == 2 and config_entry.minor_version == 0:
try:
# Version 2.0 to 2.1
new_data = data_20_to_21(new_data)
# Version 2.1 to 2.2
new_data = data_21_to_22(new_data)
except MigrateError:
_LOGGER.warning("Error while upgrading from version 2.0 to 2.1")
return False
if config_entry.version == 2 and config_entry.minor_version == 1:
try:
# Version 2.1 to 2.2
new_data = data_21_to_22(new_data)
except MigrateError:
_LOGGER.warning("Error while upgrading from version 2.1 to 2.2")
return False
hass.config_entries.async_update_entry(
config_entry,
data=new_data,
options=new_options,
version=installed_version,
minor_version=installed_minor_version,
)
_LOGGER.info(
"Migration configuration from version %s.%s to %s.%s successful",
config_entry.version,
config_entry.minor_version,
installed_version,
installed_minor_version,
)
return True
class NordpoolPlanner:
"""Planner base class."""
_hourly_update = None
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize my coordinator."""
self._hass = hass
self._config = config_entry
self._state_change_listeners = []
# Input entities
self._prices_entity = PricesEntity(self._config.data[CONF_PRICES_ENTITY])
# TODO: Remove, likely not needed anymore as async_track_time_change in async_setup() will ensure update every hour
# self._state_change_listeners.append(
# async_track_state_change_event(
# self._hass,
# [self._prices_entity.unique_id],
# self._async_input_changed,
# )
# )
# Configuration entities
self._duration_number_entity = ""
self._accept_cost_number_entity = ""
self._accept_rate_number_entity = ""
self._search_length_number_entity = ""
self._start_time_number_entity = ""
self._end_time_number_entity = ""
# TODO: Make dictionary?
# Output entities
self._output_listeners: dict[str, NordpoolPlannerEntity] = {}
# Local state variables
self._last_update = None
self.low_hours = None
self._planner_status = NordpoolPlannerStatus()
# Output states
self.low_cost_state = NordpoolPlannerState()
self.high_cost_state = NordpoolPlannerState()
def as_dict(self):
"""For diagnostics serialization."""
res = self.__dict__.copy()
for k, i in res.copy().items():
if "_number_entity" in k:
res[k] = {"id": i, "value": self.get_number_entity_value(i)}
return res
async def async_setup(self):
"""Post initialization setup."""
# Ensure an update is done on every hour
self._hourly_update = async_track_time_change(
self._hass, self.scheduled_update, minute=0, second=0
)
@property
def name(self) -> str:
"""Name of planner."""
return self._config.data["name"]
@property
def price_sensor_id(self) -> str:
"""Entity id of source sensor."""
return self._prices_entity.unique_id
@property
def price_now(self) -> str:
"""Current price from source sensor."""
return self._prices_entity.current_price_attr
@property
def planner_status(self) -> NordpoolPlannerStatus:
"""Current planner status."""
return self._planner_status
@property
def _duration(self) -> int:
"""Get duration parameter."""
return self.get_number_entity_value(self._duration_number_entity, integer=True)
@property
def _is_moving(self) -> bool:
"""Get if planner is of type Moving."""
return self._config.data[CONF_TYPE] == CONF_TYPE_MOVING
@property
def _is_static(self) -> bool:
"""Get if planner is of type Static."""
return self._config.data[CONF_TYPE] == CONF_TYPE_STATIC
@property
def _search_length(self) -> int:
"""Get search length parameter."""
return self.get_number_entity_value(
self._search_length_number_entity, integer=True
)
@property
def _start_time(self) -> int:
"""Get start time parameter."""
return self.get_number_entity_value(
self._start_time_number_entity, integer=True
)
@property
def _end_time(self) -> int:
"""Get end time parameter."""
return self.get_number_entity_value(self._end_time_number_entity, integer=True)
@property
def _accept_cost(self) -> float:
"""Get accept cost parameter."""
return self.get_number_entity_value(self._accept_cost_number_entity)
@property
def _accept_rate(self) -> float:
"""Get accept rate parameter."""
return self.get_number_entity_value(self._accept_rate_number_entity)
def cleanup(self):
"""Cleanup by removing event listeners."""
for lister in self._state_change_listeners:
lister()
def get_number_entity_value(
self, entity_id: str, integer: bool = False
) -> float | int | None:
"""Get value of generic entity parameter."""
if entity_id:
try:
entity = self._hass.states.get(entity_id)
state = entity.state
value = float(state)
if integer:
return int(value)
return value # noqa: TRY300
except (TypeError, ValueError):
_LOGGER.warning(
'Could not convert value "%s" of entity %s to expected format',
state,
entity_id,
)
except Exception as e: # noqa: BLE001
_LOGGER.error(
'Unknown error wen reading and converting "%s": %s',
entity_id,
e,
)
else:
_LOGGER.debug("No entity defined")
return None
def register_input_entity_id(self, entity_id, conf_key) -> None:
"""Register input entity id."""
# Input numbers
if conf_key == CONF_DURATION_ENTITY:
self._duration_number_entity = entity_id
elif conf_key == CONF_ACCEPT_COST_ENTITY:
self._accept_cost_number_entity = entity_id
elif conf_key == CONF_ACCEPT_RATE_ENTITY:
self._accept_rate_number_entity = entity_id
elif conf_key == CONF_SEARCH_LENGTH_ENTITY:
self._search_length_number_entity = entity_id
elif conf_key == CONF_START_TIME_ENTITY:
self._start_time_number_entity = entity_id
elif conf_key == CONF_END_TIME_ENTITY:
self._end_time_number_entity = entity_id
else:
_LOGGER.warning(
'An entity "%s" was registered for callback but no match for key "%s"',
entity_id,
conf_key,
)
self._state_change_listeners.append(
async_track_state_change_event(
self._hass,
[entity_id],
self._async_input_changed,
)
)
def register_output_listener_entity(
self, entity: NordpoolPlannerEntity, conf_key=""
) -> None:
"""Register output entity."""
if conf_key in self._output_listeners:
_LOGGER.warning(
'An output listener with key "%s" and unique id "%s" is overriding previous entity "%s"',
conf_key,
self._output_listeners.get(conf_key).entity_id,
entity.entity_id,
)
self._output_listeners[conf_key] = entity
def get_device_info(self) -> DeviceInfo:
"""Get device info to group entities."""
return DeviceInfo(
identifiers={(DOMAIN, self._config.entry_id)},
name=self.name,
manufacturer="Nordpool",
entry_type=DeviceEntryType.SERVICE,
model="Forecast",
)
def scheduled_update(self, _):
"""Scheduled updates callback."""
_LOGGER.debug("Scheduled callback")
self.update()
def input_changed(self, value):
"""Input entity callback to initiate a planner update."""
_LOGGER.debug("Sensor change event from callback: %s", value)
self.update()
async def _async_input_changed(self, event):
"""Input entity change callback from state change event."""
new_state = event.data.get("new_state")
_LOGGER.debug("Sensor change event from HASS: %s", new_state)
self.update()
def update(self):
"""Planner update call function."""
_LOGGER.debug("Updating planner")
# Update inputs
if not self._prices_entity.update(self._hass) and not self._prices_entity.valid:
self.set_unavailable()
self._planner_status.status = PlannerStates.Error
self._planner_status.running_text = "No valid Price data"
return
if not self._duration:
_LOGGER.warning("Aborting update since no valid Duration")
self._planner_status.status = PlannerStates.Error
self._planner_status.running_text = "No valid Duration data"
return
if self._is_moving and not self._search_length:
_LOGGER.warning("Aborting update since no valid Search length")
self._planner_status.status = PlannerStates.Error
self._planner_status.running_text = "No valid Search-Length data"
return
if self._is_static and not (self._start_time and self._end_time):
_LOGGER.warning("Aborting update since no valid Start or end time")
self._planner_status.status = PlannerStates.Error
self._planner_status.running_text = "No valid Start-Time or End-Time"
return
# If come this far no running error texts relevant (for now...)
self._planner_status.status = PlannerStates.Ok
self._planner_status.running_text = "ok"
self._planner_status.config_text = "ok"
if self._is_moving and self._search_length < self._duration:
self._planner_status.status = PlannerStates.Warning
self._planner_status.config_text = "Duration is Lager than Search-Length"
# if self._is_static and (self._end_time - self._start_time) < self._duration:
# self._planner_status.status = PlannerStates.Warning
# self._planner_status.config_text = "Duration is Lager than Search-Window"
# initialize local variables
now = dt_util.now()
if self._is_static and self.low_hours is not None:
if self.low_hours >= self._duration:
_LOGGER.debug("No need to update, quota of hours fulfilled")
self.set_done_for_now()
self._planner_status.status = PlannerStates.Idle
self._planner_status.running_text = "Quota of hours fulfilled"
return
duration = dt.timedelta(hours=max(0, self._duration - self.low_hours) - 1)
# TODO: Need to fix this so that the duration amount of hours are found in range for static
# duration = dt.timedelta(hours=1)
else:
duration = dt.timedelta(hours=self._duration - 1)
# Initiate states and variables for Moving planner
if self._is_moving:
start_time = now
end_time = now + dt.timedelta(hours=self._search_length)
# Initiate states and variables for Static planner
elif self._is_static:
start_time = now.replace(
hour=self._start_time, minute=0, second=0, microsecond=0
)
end_time = now.replace(
hour=self._end_time, minute=0, second=0, microsecond=0
)
# First ensure end is after start (spans over midnight)
if end_time < start_time:
# Have not started range yet
if end_time < now:
end_time += dt.timedelta(days=1)
# Started range "yesterday"
else:
start_time -= dt.timedelta(days=1)
# In active range
if start_time < now and end_time > now:
# Bump up start to now so that prices in the past is not used
start_time = now
# Invalid planner type
else:
_LOGGER.warning("Aborting update since unknown planner type")
self._planner_status.status = PlannerStates.Error
self._planner_status.config_text = "Bad planner type"
return
prices_groups: list[NordpoolPricesGroup] = []
offset = 0
while True:
start_offset = dt.timedelta(hours=offset)
first_time = start_time + start_offset
last_time = first_time + duration
if offset != 0 and last_time > end_time:
break
offset += 1
prices_group = self._prices_entity.get_prices_group(first_time, last_time)
if not prices_group.valid:
continue
# TODO: Should not end up here, why?
prices_groups.append(prices_group)
if len(prices_groups) == 0:
_LOGGER.warning(
"Aborting update since no prices fetched in range %s to %s with duration %s",
start_time,
end_time,
duration,
)
self._planner_status.status = PlannerStates.Warning
self._planner_status.running_text = "No prices in active range"
return
_LOGGER.debug(
"Processing %s prices_groups found in range %s to %s",
len(prices_groups),
start_time,
end_time,
)
accept_cost = self._accept_cost
accept_rate = self._accept_rate
lowest_cost_group: NordpoolPricesGroup = prices_groups[0]
for p in prices_groups:
if accept_cost and p.average < accept_cost:
_LOGGER.debug("Accept cost fulfilled")
self.set_lowest_cost_state(p)
break
if accept_rate:
if self._prices_entity.average_attr <= 0:
if p.average <= 0:
_LOGGER.debug(
"Accept rate indirectly fulfilled (NP average & range average <= 0)"
)
self.set_lowest_cost_state(p)
break
elif (p.average / self._prices_entity.average_attr) <= accept_rate:
_LOGGER.debug("Accept rate fulfilled")
self.set_lowest_cost_state(p)
break
if p.average < lowest_cost_group.average:
lowest_cost_group = p
else:
self.set_lowest_cost_state(lowest_cost_group)
highest_cost_group: NordpoolPricesGroup = prices_groups[0]
for p in prices_groups:
if p.average > highest_cost_group.average:
highest_cost_group = p
self.set_highest_cost_state(highest_cost_group)
if not self._last_update:
pass
elif self._last_update.hour != now.hour:
_LOGGER.debug(
"Swapping hour on change from %s to %s", self._last_update, now
)
if self._is_static:
if self.low_cost_state.on_at(now):
if self.low_hours is None:
self.low_hours = 1
else:
self.low_hours += 1
if end_time.hour == now.hour:
self.low_hours = 0
self._last_update = now
for listener in self._output_listeners.values():
listener.update_callback()
def set_lowest_cost_state(self, prices_group: NordpoolPricesGroup) -> None:
"""Set the state to output variable."""
self.low_cost_state.starts_at = prices_group.start_time
self.low_cost_state.cost_at = prices_group.average
if prices_group.average != 0:
self.low_cost_state.now_cost_rate = (
self._prices_entity.current_price_attr / prices_group.average
)
else:
self.low_cost_state.now_cost_rate = STATE_UNAVAILABLE
_LOGGER.debug("Wrote lowest cost state: %s", self.low_cost_state)
def set_highest_cost_state(self, prices_group: NordpoolPricesGroup) -> None:
"""Set the state to output variable."""
self.high_cost_state.starts_at = prices_group.start_time
self.high_cost_state.cost_at = prices_group.average
if prices_group.average != 0:
self.high_cost_state.now_cost_rate = (
self._prices_entity.current_price_attr / prices_group.average
)
else:
self.high_cost_state.now_cost_rate = STATE_UNAVAILABLE
_LOGGER.debug("Wrote highest cost state: %s", self.high_cost_state)
def set_done_for_now(self) -> None:
"""Set output state to off."""
now_hour = dt_util.now().replace(minute=0, second=0, microsecond=0)
start_hour = now_hour.replace(hour=self._start_time)
if start_hour < now_hour:
start_hour += dt.timedelta(days=1)
self.low_cost_state.starts_at = start_hour
self.low_cost_state.cost_at = STATE_UNAVAILABLE
self.low_cost_state.now_cost_rate = STATE_UNAVAILABLE
self.high_cost_state.starts_at = start_hour
self.high_cost_state.cost_at = STATE_UNAVAILABLE
self.high_cost_state.now_cost_rate = STATE_UNAVAILABLE
_LOGGER.debug("Setting output states to unavailable")
for listener in self._output_listeners.values():
listener.update_callback()
def set_unavailable(self) -> None:
"""Set output state to unavailable."""
self.low_cost_state.starts_at = STATE_UNAVAILABLE
self.low_cost_state.cost_at = STATE_UNAVAILABLE
self.low_cost_state.now_cost_rate = STATE_UNAVAILABLE
self.high_cost_state.starts_at = STATE_UNAVAILABLE
self.high_cost_state.cost_at = STATE_UNAVAILABLE
self.high_cost_state.now_cost_rate = STATE_UNAVAILABLE
_LOGGER.debug("Setting output states to unavailable")
for listener in self._output_listeners.values():
listener.update_callback()
class PricesEntity:
"""Representation for Nordpool state."""
def __init__(self, unique_id: str) -> None:
"""Initialize state tracker."""
self._unique_id = unique_id
self._np = None
def as_dict(self):
"""For diagnostics serialization."""
return self.__dict__
@property
def unique_id(self) -> str:
"""Get the unique id."""
return self._unique_id
@property
def valid(self) -> bool:
"""Get if data is valid."""
# TODO: Add more checks, make function of those in update()
return self._np is not None
@property
def _all_prices(self):
if np_prices := self._np.attributes.get("raw_today"):
# For Nordpool format
if self._np.attributes["tomorrow_valid"]:
np_prices += self._np.attributes["raw_tomorrow"]
return np_prices
elif e_prices := self._np.attributes.get("prices"): # noqa: RET505
# For ENTSO-e format
e_prices = [
{"start": dt_util.parse_datetime(ep["time"]), "value": ep["price"]}
for ep in e_prices
]
return e_prices # noqa: RET504
return []
@property
def average_attr(self):
"""Get the average price attribute."""
if self._np is not None:
if "average_electricity_price" in self._np.entity_id:
# For ENTSO-e average
try:
return float(self._np.state)
except ValueError:
_LOGGER.warning(
'Could not convert "%s" to float for average sensor "%s"',
self._np.state,
self._np.entity_id,
)
else:
# For Nordpool format
return self._np.attributes["average"]
return None
@property
def current_price_attr(self):
"""Get the current price attribute."""
if self._np is not None:
if current := self._np.attributes.get("current_price"):
# For Nordpool format
return current
else: # noqa: RET505
# For general, find in list
now = dt_util.now()
for price in self._all_prices:
if (
price["start"] < now
and price["start"] + dt.timedelta(hours=1) > now
):
return price["value"]
return None
def update(self, hass: HomeAssistant) -> bool:
"""Update price in storage."""
if self._unique_id == NAME_FILE_READER:
np = get_np_from_file(PATH_FILE_READER)
else:
np = hass.states.get(self._unique_id)
if np is None:
_LOGGER.warning("Got empty data from Nordpool entity %s ", self._unique_id)
elif "today" not in np.attributes and "prices_today" not in np.attributes:
_LOGGER.warning(
"No values for today in Nordpool entity %s ", self._unique_id
)
else:
_LOGGER.debug(
"Nordpool sensor %s was updated successfully", self._unique_id
)
if self._np is None:
pass
self._np = np
if self._np is None:
return False
return True
def get_prices_group(
self, start: dt.datetime, end: dt.datetime
) -> NordpoolPricesGroup:
"""Get a range of prices from NP given the start and end datetimes.
Ex. If start is 7:05 and end 10:05, a list of 4 prices will be returned,
7, 8, 9 & 10.
"""
started = False
selected = []
for p in self._all_prices:
if p["start"] > start - dt.timedelta(hours=1):
started = True
if p["start"] > end:
break
if started:
selected.append(p)
return NordpoolPricesGroup(selected)
class NordpoolPricesGroup:
"""A slice if Nordpool prices with helper functions."""
def __init__(self, prices) -> None:
"""Initialize price group."""
self._prices = prices
def __str__(self) -> str:
"""Get string representation of class."""
return f"start_time={self.start_time.strftime("%Y-%m-%d %H:%M")} average={self.average} len(_prices)={len(self._prices)}"
def __repr__(self) -> str:
"""Get string representation for debugging."""
return type(self).__name__ + f" ({self.__str__()})"
@property
def valid(self) -> bool:
"""Is the price group valid."""
if len(self._prices) == 0:
# _LOGGER.debug("None-valid price range group, len=%s", len(self._prices))
return False
return True
@property
def average(self) -> float:
"""The average price of the price group."""
# if not self.valid:
# _LOGGER.warning(
# "Average set to 1 for invalid price group, should not happen"
# )
# return 1
return sum([p["value"] for p in self._prices]) / len(self._prices)
@property
def start_time(self) -> dt.datetime:
"""The start time of first price in group."""
# if not self.valid:
# _LOGGER.warning(
# "Start time set to None for invalid price group, should not happen"
# )
# return None
return self._prices[0]["start"]
class NordpoolPlannerState:
"""State attribute representation."""
def __init__(self) -> None:
"""Initiate states."""
self.starts_at = STATE_UNKNOWN
self.cost_at = STATE_UNKNOWN
self.now_cost_rate = STATE_UNKNOWN
def __str__(self) -> str:
"""Get string representation of class."""
return f"start_at={self.starts_at} cost_at={self.cost_at:.2} now_cost_rate={self.now_cost_rate:.2}"
def as_dict(self):
"""For diagnostics serialization."""
return self.__dict__
def on_at(self, time: dt.datetime) -> bool:
"""Get boolean state if start is before given timestamp."""
if self.starts_at not in [
STATE_UNKNOWN,
STATE_UNAVAILABLE,
]:
return self.starts_at < time
return False
class NordpoolPlannerStatus:
"""Status for the overall planner."""
def __init__(self) -> None:
"""Initiate status."""
self.status = PlannerStates.Unknown
self.running_text = ""
self.config_text = ""
class NordpoolPlannerEntity(Entity):
"""Base class for nordpool planner entities."""
def __init__(
self,
planner: NordpoolPlanner,
) -> None:
"""Initialize entity."""
# Input configs
self._planner = planner
self._attr_device_info = planner.get_device_info()
def as_dict(self):
"""For diagnostics serialization."""
return {
k: v
for k, v in self.__dict__.items()
if not (
k.startswith("_")
or k in ["hass", "platform", "registry_entry", "device_entry"]
)
}
@property
def should_poll(self):
"""No need to poll. Coordinator notifies entity of updates."""
return False
def update_callback(self) -> None:
"""Call from planner that new data available."""
self.schedule_update_ha_state()
@@ -0,0 +1,153 @@
"""Binary sensor definitions."""
from __future__ import annotations
import logging
from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from . import NordpoolPlanner, NordpoolPlannerEntity
from .const import CONF_HIGH_COST_ENTITY, CONF_LOW_COST_ENTITY, DOMAIN
_LOGGER = logging.getLogger(__name__)
# LOW_COST_ENTITY_DESCRIPTION = BinarySensorEntityDescription(
# key=CONF_LOW_COST_ENTITY,
# # device_class=BinarySensorDeviceClass.???,
# )
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities
):
"""Create state binary sensor entities for platform."""
planner: NordpoolPlanner = hass.data[DOMAIN][config_entry.entry_id]
entities = []
if config_entry.data.get(CONF_LOW_COST_ENTITY):
entities.append(
NordpoolPlannerBinarySensor(
planner,
entity_description=BinarySensorEntityDescription(
key=CONF_LOW_COST_ENTITY,
# device_class=BinarySensorDeviceClass.???,
),
)
)
if config_entry.data.get(CONF_HIGH_COST_ENTITY):
entities.append(
NordpoolPlannerBinarySensor(
planner,
entity_description=BinarySensorEntityDescription(
key=CONF_HIGH_COST_ENTITY,
# device_class=BinarySensorDeviceClass.???,
),
)
)
async_add_entities(entities)
return True
class NordpoolPlannerBinarySensor(NordpoolPlannerEntity, BinarySensorEntity):
"""Binary state sensor."""
_attr_icon = "mdi:flash"
def __init__(
self,
planner,
entity_description: BinarySensorEntityDescription,
) -> None:
"""Initialize the entity."""
super().__init__(planner)
self.entity_description = entity_description
self._attr_name = (
self._planner.name
+ " "
+ entity_description.key.replace("_entity", "").replace("_", " ")
)
self._attr_unique_id = (
("nordpool_planner_" + self._attr_name)
.lower()
.replace(".", "")
.replace(" ", "_")
)
@property
def is_on(self):
"""Output state."""
state = None
# TODO: This can be made nicer to get value from states in dictionary in planner
if self.entity_description.key == CONF_LOW_COST_ENTITY:
state = self._planner.low_cost_state.on_at(dt_util.now())
# if self._planner.low_cost_state.starts_at not in [
# STATE_UNKNOWN,
# STATE_UNAVAILABLE,
# ]:
# state = self._planner.low_cost_state.starts_at < dt_util.now()
if self.entity_description.key == CONF_HIGH_COST_ENTITY:
state = self._planner.high_cost_state.on_at(dt_util.now())
# if self._planner.high_cost_state.starts_at not in [
# STATE_UNKNOWN,
# STATE_UNAVAILABLE,
# ]:
# state = self._planner.high_cost_state.starts_at < dt_util.now()
_LOGGER.debug(
'Returning state "%s" of binary sensor "%s"',
state,
self.unique_id,
)
return state
@property
def extra_state_attributes(self):
"""Extra state attributes."""
state_attributes = {
"starts_at": STATE_UNKNOWN,
"cost_at": STATE_UNKNOWN,
"current_cost": self._planner.price_now,
"current_cost_rate": STATE_UNKNOWN,
"price_sensor": self._planner.price_sensor_id,
}
# TODO: This can be made nicer to get value from states in dictionary in planner
if self.entity_description.key == CONF_LOW_COST_ENTITY:
state_attributes = {
"starts_at": self._planner.low_cost_state.starts_at,
"cost_at": self._planner.low_cost_state.cost_at,
"current_cost": self._planner.price_now,
"current_cost_rate": self._planner.low_cost_state.now_cost_rate,
"price_sensor": self._planner.price_sensor_id,
}
elif self.entity_description.key == CONF_HIGH_COST_ENTITY:
state_attributes = {
"starts_at": self._planner.high_cost_state.starts_at,
"cost_at": self._planner.high_cost_state.cost_at,
"current_cost": self._planner.price_now,
"current_cost_rate": self._planner.high_cost_state.now_cost_rate,
"price_sensor": self._planner.price_sensor_id,
}
_LOGGER.debug(
'Returning extra state attributes "%s" of binary sensor "%s"',
state_attributes,
self.unique_id,
)
return state_attributes
async def async_added_to_hass(self) -> None:
"""Load the last known state when added to hass."""
await super().async_added_to_hass()
self._planner.register_output_listener_entity(self, self.entity_description.key)
# async def async_update(self):
# """Called from Home Assistant to update entity value"""
# self._planner.update()
@@ -0,0 +1,86 @@
"""Button definitions."""
from __future__ import annotations
import logging
from homeassistant.components.button import (
ButtonDeviceClass,
ButtonEntityDescription,
ButtonEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
EntityCategory,
)
from homeassistant.core import HomeAssistant
from . import NordpoolPlanner, NordpoolPlannerEntity
from .const import (
CONF_END_TIME_ENTITY,
CONF_USED_TIME_RESET_ENTITY,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
CONF_USED_TIME_RESET_ENTITY_DESCRIPTION = ButtonEntityDescription(
key=CONF_USED_TIME_RESET_ENTITY,
device_class=ButtonDeviceClass.RESTART,
entity_category=EntityCategory.DIAGNOSTIC
)
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities
):
"""Create action button entities for platform."""
planner: NordpoolPlanner = hass.data[DOMAIN][config_entry.entry_id]
entities = []
if config_entry.data.get(CONF_END_TIME_ENTITY):
entities.append(
NordpoolPlannerButton(
planner,
entity_description=CONF_USED_TIME_RESET_ENTITY_DESCRIPTION,
)
)
async_add_entities(entities)
return True
class NordpoolPlannerButton(NordpoolPlannerEntity, ButtonEntity):
"""Button config entity."""
def __init__(
self,
planner,
entity_description: ButtonEntityDescription,
) -> None:
"""Initialize the entity."""
super().__init__(planner)
self.entity_description = entity_description
self._attr_name = (
self._planner.name
+ " "
+ entity_description.key.replace("_entity", "").replace("_", " ")
)
self._attr_unique_id = (
("nordpool_planner_" + self._attr_name)
.lower()
.replace(".", "")
.replace(" ", "_")
)
async def async_added_to_hass(self) -> None:
"""Load the last known state when added to hass."""
await super().async_added_to_hass()
self._planner.register_input_entity_id(
self.entity_id, self.entity_description.key
)
def press(self) -> None:
"""Press the button."""
self._planner.low_hours = 0
@@ -0,0 +1,154 @@
"""Config flow for PoolLab integration."""
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import ATTR_NAME, ATTR_UNIT_OF_MEASUREMENT
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import selector, template
from .const import (
CONF_ACCEPT_COST_ENTITY,
CONF_ACCEPT_RATE_ENTITY,
CONF_DURATION_ENTITY,
CONF_END_TIME_ENTITY,
CONF_HEALTH_ENTITY,
CONF_HIGH_COST_ENTITY,
CONF_LOW_COST_ENTITY,
CONF_PRICES_ENTITY,
CONF_SEARCH_LENGTH_ENTITY,
CONF_START_TIME_ENTITY,
CONF_STARTS_AT_ENTITY,
CONF_TYPE,
CONF_TYPE_LIST,
CONF_TYPE_MOVING,
CONF_TYPE_STATIC,
CONF_USED_HOURS_LOW_ENTITY,
DOMAIN,
NAME_FILE_READER,
PATH_FILE_READER,
)
from .helpers import get_np_from_file
_LOGGER = logging.getLogger(__name__)
ENTOSOE_DOMAIN = None
try:
from ..entsoe.const import DOMAIN as ENTOSOE_DOMAIN
except ImportError:
_LOGGER.warning("Could not import ENTSO-e integration")
NORDPOOL_DOMAIN = None
try:
from ..nordpool import DOMAIN as NORDPOOL_DOMAIN
except ImportError:
_LOGGER.warning("Could not import Nord Pool integration")
class NordpoolPlannerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Nordpool Planner config flow."""
VERSION = 2
MINOR_VERSION = 2
data = None
options = None
_reauth_entry: config_entries.ConfigEntry | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle initial user step."""
errors: dict[str, str] = {}
if user_input is not None:
self.data = user_input
# Add those that are not optional
self.data[CONF_LOW_COST_ENTITY] = True
self.data[CONF_DURATION_ENTITY] = True
if self.data[CONF_TYPE] == CONF_TYPE_MOVING:
self.data[CONF_SEARCH_LENGTH_ENTITY] = True
elif self.data[CONF_TYPE] == CONF_TYPE_STATIC:
self.data[CONF_START_TIME_ENTITY] = True
self.data[CONF_END_TIME_ENTITY] = True
self.data[CONF_USED_HOURS_LOW_ENTITY] = True
self.options = {}
if self.data[CONF_PRICES_ENTITY] == NAME_FILE_READER:
np_entity = get_np_from_file(PATH_FILE_READER)
else:
np_entity = self.hass.states.get(self.data[CONF_PRICES_ENTITY])
try:
self.options[ATTR_UNIT_OF_MEASUREMENT] = np_entity.attributes.get(
ATTR_UNIT_OF_MEASUREMENT
)
except (IndexError, KeyError):
_LOGGER.warning("Could not extract currency from Nordpool entity")
await self.async_set_unique_id(
self.data[ATTR_NAME]
+ "_"
+ self.data[CONF_PRICES_ENTITY]
+ "_"
+ self.data[CONF_TYPE]
)
self._abort_if_unique_id_configured()
_LOGGER.debug(
'Creating entry "%s" with data "%s"',
self.unique_id,
self.data,
)
return self.async_create_entry(
title=self.data[ATTR_NAME], data=self.data, options=self.options
)
selected_entities = []
if NORDPOOL_DOMAIN:
selected_entities.extend(
template.integration_entities(self.hass, NORDPOOL_DOMAIN)
)
if ENTOSOE_DOMAIN:
ent = template.integration_entities(self.hass, ENTOSOE_DOMAIN)
selected_entities.extend([s for s in ent if "average" in s])
selected_entities.append(NAME_FILE_READER)
schema = vol.Schema(
{
vol.Required(ATTR_NAME): str,
vol.Required(CONF_TYPE): selector.SelectSelector(
selector.SelectSelectorConfig(options=CONF_TYPE_LIST),
),
vol.Required(CONF_PRICES_ENTITY): selector.SelectSelector(
selector.SelectSelectorConfig(options=selected_entities),
),
vol.Required(CONF_ACCEPT_COST_ENTITY, default=False): bool,
vol.Required(CONF_ACCEPT_RATE_ENTITY, default=False): bool,
vol.Required(CONF_HIGH_COST_ENTITY, default=False): bool,
vol.Required(CONF_STARTS_AT_ENTITY, default=False): bool,
vol.Required(CONF_HEALTH_ENTITY, default=True): bool,
}
)
placeholders = {
CONF_TYPE: CONF_TYPE_LIST,
CONF_PRICES_ENTITY: selected_entities,
}
return self.async_show_form(
step_id="user",
data_schema=schema,
description_placeholders=placeholders,
errors=errors,
)
# async def async_step_import(
# self, user_input: Optional[Dict[str, Any]] | None = None
# ) -> FlowResult:
# """Import nordpool planner config from configuration.yaml."""
# return await self.async_step_user(import_data)
@@ -0,0 +1,38 @@
"""Common constants for integration."""
from enum import Enum
DOMAIN = "nordpool_planner"
class PlannerStates(Enum):
"""Standard numeric identifiers for planner states."""
Ok = 0
Idle = 1
Warning = 2
Error = 3
Unknown = 4
CONF_TYPE = "type"
CONF_TYPE_MOVING = "moving"
CONF_TYPE_STATIC = "static"
CONF_TYPE_LIST = [CONF_TYPE_MOVING, CONF_TYPE_STATIC]
CONF_PRICES_ENTITY = "prices_entity"
CONF_LOW_COST_ENTITY = "low_cost_entity"
CONF_HEALTH_ENTITY = "health_entity"
CONF_HIGH_COST_ENTITY = "high_cost_entity"
CONF_STARTS_AT_ENTITY = "starts_at_entity"
CONF_DURATION_ENTITY = "duration_entity"
CONF_ACCEPT_COST_ENTITY = "accept_cost_entity"
CONF_ACCEPT_RATE_ENTITY = "accept_rate_entity"
CONF_SEARCH_LENGTH_ENTITY = "search_length_entity"
CONF_END_TIME_ENTITY = "end_time_entity"
CONF_USED_TIME_RESET_ENTITY = "used_time_reset_entity"
CONF_START_TIME_ENTITY = "start_time_entity"
CONF_USED_HOURS_LOW_ENTITY = "used_hours_low_entity"
NAME_FILE_READER = "file_reader"
PATH_FILE_READER = "config/config_entry-nordpool_planner.json"
@@ -0,0 +1,30 @@
"""Diagnostics support for Nordpool Planner."""
from __future__ import annotations
# import json
import logging
from typing import Any
# from homeassistant.components.diagnostics import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from . import NordpoolPlanner
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
# TO_REDACT = []
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, config_entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
diag_data = {
# "config_entry": config_entry, # Already included in the planner
"planner": hass.data[DOMAIN][config_entry.entry_id],
}
return diag_data
@@ -0,0 +1,82 @@
"""Helper functions package."""
import contextlib
import datetime as dt
import json
import pathlib
from homeassistant.core import State
from homeassistant.util import dt as dt_util
def get_np_from_file(data_file: str, set_today: bool = True) -> State | None:
"""Fake NP entity from file."""
diag_data = {}
file_path = pathlib.Path(data_file)
if file_path.is_file():
with contextlib.suppress(ValueError):
diag_data = json.loads(file_path.read_text(encoding="utf-8"))
if data := diag_data.get("data"):
if planner := data.get("planner"):
if prices_entity := planner.get("_prices_entity"):
if np := prices_entity.get("_np"):
attr = np.get("attributes")
now = dt_util.now()
if "raw_today" in attr:
for item in attr["raw_today"]:
for key, value in item.items():
if key in ["start", "end"] and isinstance(value, str):
item[key] = dt_util.parse_datetime(value)
if set_today:
item[key] = item[key].replace(
year=now.year, month=now.month, day=now.day
)
if "raw_tomorrow" in attr:
for item in attr["raw_tomorrow"]:
for key, value in item.items():
if key in ["start", "end"] and isinstance(value, str):
item[key] = dt_util.parse_datetime(value)
if set_today:
item[key] = item[key].replace(
year=now.year, month=now.month, day=now.day
)
item[key] += dt.timedelta(days=1)
if "prices" in attr and set_today:
first_time = None
original_tz = None
for item in attr["prices"]:
for key, value in item.items():
if key in ["time"] and isinstance(value, str):
fixed_time = dt_util.parse_datetime(value)
if not original_tz:
original_tz = fixed_time.tzinfo
fixed_time = fixed_time.astimezone(now.tzinfo)
if not first_time:
first_time = fixed_time
if fixed_time.day == first_time.day:
fixed_time = fixed_time.replace(
year=now.year, month=now.month, day=now.day
)
else:
fixed_time = fixed_time.replace(
year=now.year, month=now.month, day=now.day
)
fixed_time += dt.timedelta(days=1)
item[key] = fixed_time.astimezone(
original_tz
).strftime("%Y-%m-%d %H:%M:%S%z")
return State(
entity_id=np.get("entity_id"),
state=np.get("state"),
attributes=attr,
# last_changed: datetime.datetime | None = None,
# last_reported: datetime.datetime | None = None,
# last_updated: datetime.datetime | None = None,
# context: Context | None = None,
# validate_entity_id: bool | None = True,
# state_info: StateInfo | None = None,
# last_updated_timestamp: float | None = None,
)
return None
@@ -0,0 +1,18 @@
{
"domain": "nordpool_planner",
"name": "Nordpool Planner",
"after_dependencies": [
"nordpool",
"entsoe"
],
"codeowners": [
"@dala318"
],
"config_flow": true,
"dependencies": [],
"documentation": "https://github.com/dala318/nordpool_planner",
"iot_class": "calculated",
"issue_tracker": "https://github.com/dala318/nordpool_planner/issues",
"requirements": [],
"version": "v2.2.1"
}
@@ -0,0 +1,206 @@
"""Number definitions."""
from __future__ import annotations
import logging
from homeassistant.components.number import (
NumberDeviceClass,
NumberEntityDescription,
RestoreNumber,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_UNIT_OF_MEASUREMENT,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
UnitOfTime,
)
from homeassistant.core import HomeAssistant
from . import NordpoolPlanner, NordpoolPlannerEntity
from .const import (
CONF_ACCEPT_COST_ENTITY,
CONF_ACCEPT_RATE_ENTITY,
CONF_DURATION_ENTITY,
CONF_END_TIME_ENTITY,
CONF_SEARCH_LENGTH_ENTITY,
CONF_START_TIME_ENTITY,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
DURATION_ENTITY_DESCRIPTION = NumberEntityDescription(
key=CONF_DURATION_ENTITY,
device_class=NumberDeviceClass.DURATION,
native_min_value=1,
native_max_value=8,
native_step=1,
native_unit_of_measurement=UnitOfTime.HOURS,
)
ACCEPT_COST_ENTITY_DESCRIPTION = NumberEntityDescription(
key=CONF_ACCEPT_COST_ENTITY,
device_class=NumberDeviceClass.MONETARY,
native_min_value=-20.0,
native_max_value=20.0,
native_step=0.01,
)
ACCEPT_RATE_ENTITY_DESCRIPTION = NumberEntityDescription(
key=CONF_ACCEPT_RATE_ENTITY,
device_class=NumberDeviceClass.DATA_RATE,
native_min_value=0.1,
native_max_value=1.0,
native_step=0.1,
)
SEARCH_LENGTH_ENTITY_DESCRIPTION = NumberEntityDescription(
key=CONF_SEARCH_LENGTH_ENTITY,
device_class=NumberDeviceClass.DURATION,
native_min_value=3,
native_max_value=23, # Let's keep it below 24h to not risk wrapping a day.
native_step=1,
native_unit_of_measurement=UnitOfTime.HOURS,
)
END_TIME_ENTITY_DESCRIPTION = NumberEntityDescription(
key=CONF_END_TIME_ENTITY,
device_class=NumberDeviceClass.DURATION,
native_min_value=0,
native_max_value=23,
native_step=1,
native_unit_of_measurement=UnitOfTime.HOURS,
)
START_TIME_ENTITY_DESCRIPTION = NumberEntityDescription(
key=CONF_START_TIME_ENTITY,
device_class=NumberDeviceClass.DURATION,
native_min_value=0,
native_max_value=23,
native_step=1,
native_unit_of_measurement=UnitOfTime.HOURS,
)
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities
):
"""Create configuration number entities for platform."""
planner: NordpoolPlanner = hass.data[DOMAIN][config_entry.entry_id]
entities = []
if config_entry.data.get(CONF_DURATION_ENTITY):
entities.append(
NordpoolPlannerNumber(
planner,
start_val=3,
entity_description=DURATION_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_ACCEPT_COST_ENTITY):
entity_description = ACCEPT_COST_ENTITY_DESCRIPTION
# Override if currency option is set
if unit_of_measurement := config_entry.options.get(ATTR_UNIT_OF_MEASUREMENT):
entity_description = NumberEntityDescription(
key=ACCEPT_COST_ENTITY_DESCRIPTION.key,
device_class=ACCEPT_COST_ENTITY_DESCRIPTION.device_class,
native_min_value=ACCEPT_COST_ENTITY_DESCRIPTION.native_min_value,
native_max_value=ACCEPT_COST_ENTITY_DESCRIPTION.native_max_value,
native_step=ACCEPT_COST_ENTITY_DESCRIPTION.native_step,
native_unit_of_measurement=unit_of_measurement,
)
entities.append(
NordpoolPlannerNumber(
planner,
start_val=0.0,
entity_description=entity_description,
)
)
if config_entry.data.get(CONF_ACCEPT_RATE_ENTITY):
entities.append(
NordpoolPlannerNumber(
planner,
start_val=0.1,
entity_description=ACCEPT_RATE_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_SEARCH_LENGTH_ENTITY):
entities.append(
NordpoolPlannerNumber(
planner,
start_val=10,
entity_description=SEARCH_LENGTH_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_END_TIME_ENTITY):
entities.append(
NordpoolPlannerNumber(
planner,
start_val=7,
entity_description=END_TIME_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_START_TIME_ENTITY):
entities.append(
NordpoolPlannerNumber(
planner,
start_val=18,
entity_description=START_TIME_ENTITY_DESCRIPTION,
)
)
async_add_entities(entities)
return True
class NordpoolPlannerNumber(NordpoolPlannerEntity, RestoreNumber):
"""Number config entity."""
def __init__(
self,
planner,
start_val,
entity_description: NumberEntityDescription,
) -> None:
"""Initialize the entity."""
super().__init__(planner)
self.entity_description = entity_description
self._default_value = start_val
self._attr_name = (
self._planner.name
+ " "
+ entity_description.key.replace("_entity", "").replace("_", " ")
)
self._attr_unique_id = (
("nordpool_planner_" + self._attr_name)
.lower()
.replace(".", "")
.replace(" ", "_")
)
async def async_added_to_hass(self) -> None:
"""Load the last known state when added to hass."""
await super().async_added_to_hass()
if (last_state := await self.async_get_last_state()) and (
last_number_data := await self.async_get_last_number_data()
):
if last_state.state not in (STATE_UNKNOWN, STATE_UNAVAILABLE):
self._attr_native_value = last_number_data.native_value
else:
self._attr_native_value = self._default_value
self._planner.register_input_entity_id(
self.entity_id, self.entity_description.key
)
async def async_set_native_value(self, value: float) -> None:
"""Update the current value."""
self._attr_native_value = value
_LOGGER.debug(
"Got new async value %s for %s",
value,
self.name,
)
self.async_schedule_update_ha_state()
@@ -0,0 +1,221 @@
"""Binary sensor definitions."""
from __future__ import annotations
import logging
from homeassistant.components.sensor import (
RestoreSensor,
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN, EntityCategory
from homeassistant.core import HomeAssistant
from . import NordpoolPlanner, NordpoolPlannerEntity
from .const import (
CONF_HEALTH_ENTITY,
CONF_HIGH_COST_ENTITY,
CONF_LOW_COST_ENTITY,
CONF_STARTS_AT_ENTITY,
CONF_USED_HOURS_LOW_ENTITY,
DOMAIN,
PlannerStates,
)
_LOGGER = logging.getLogger(__name__)
CONF_LOW_COST_STARTS_AT_ENTITY = (
CONF_LOW_COST_ENTITY.replace("_entity", "") + "_" + CONF_STARTS_AT_ENTITY
)
CONF_HIGH_COST_STARTS_AT_ENTITY = (
CONF_HIGH_COST_ENTITY.replace("_entity", "") + "_" + CONF_STARTS_AT_ENTITY
)
LOW_COST_START_AT_ENTITY_DESCRIPTION = SensorEntityDescription(
key=CONF_LOW_COST_STARTS_AT_ENTITY,
device_class=SensorDeviceClass.TIMESTAMP,
)
HIGH_COST_START_AT_ENTITY_DESCRIPTION = SensorEntityDescription(
key=CONF_HIGH_COST_STARTS_AT_ENTITY,
device_class=SensorDeviceClass.TIMESTAMP,
)
USED_HOURS_LOW_ENTITY_DESCRIPTION = SensorEntityDescription(
key=CONF_USED_HOURS_LOW_ENTITY,
device_class=SensorDeviceClass.DURATION,
entity_category=EntityCategory.DIAGNOSTIC,
)
HEALTH_ENTITY_DESCRIPTION = SensorEntityDescription(
key=CONF_HEALTH_ENTITY,
device_class=SensorDeviceClass.ENUM,
entity_category=EntityCategory.DIAGNOSTIC,
options=[e.name for e in PlannerStates],
)
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities
):
"""Create state sensor entities for platform."""
planner: NordpoolPlanner = hass.data[DOMAIN][config_entry.entry_id]
entities = []
if config_entry.data.get(CONF_STARTS_AT_ENTITY):
if config_entry.data.get(CONF_LOW_COST_ENTITY):
entities.append(
NordpoolPlannerStartAtSensor(
planner,
entity_description=LOW_COST_START_AT_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_HIGH_COST_ENTITY):
entities.append(
NordpoolPlannerStartAtSensor(
planner,
entity_description=HIGH_COST_START_AT_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_USED_HOURS_LOW_ENTITY):
entities.append(
NordpoolPlannerUsedHoursSensor(
planner,
entity_description=USED_HOURS_LOW_ENTITY_DESCRIPTION,
)
)
if config_entry.data.get(CONF_HEALTH_ENTITY):
entities.append(
NordpoolPlannerHealthSensor(
planner,
entity_description=HEALTH_ENTITY_DESCRIPTION,
)
)
async_add_entities(entities)
return True
class NordpoolPlannerSensor(NordpoolPlannerEntity, SensorEntity):
"""Generic state sensor."""
_attr_icon = "mdi:flash"
def __init__(
self,
planner,
entity_description: SensorEntityDescription,
) -> None:
"""Initialize the entity."""
super().__init__(planner)
self.entity_description = entity_description
self._attr_name = (
self._planner.name
+ " "
+ entity_description.key.replace("_entity", "").replace("_", " ")
)
self._attr_unique_id = (
("nordpool_planner_" + self._attr_name)
.lower()
.replace(".", "")
.replace(" ", "_")
)
async def async_added_to_hass(self) -> None:
"""Load the last known state when added to hass."""
await super().async_added_to_hass()
self._planner.register_output_listener_entity(self, self.entity_description.key)
class NordpoolPlannerStartAtSensor(NordpoolPlannerSensor):
"""Start at specific sensor."""
@property
def native_value(self):
"""Output state."""
state = STATE_UNKNOWN
# TODO: This can be made nicer to get value from states in dictionary in planner
if self.entity_description.key == CONF_LOW_COST_STARTS_AT_ENTITY:
if self._planner.low_cost_state.starts_at not in [
STATE_UNKNOWN,
STATE_UNAVAILABLE,
]:
state = self._planner.low_cost_state.starts_at
if self.entity_description.key == CONF_HIGH_COST_STARTS_AT_ENTITY:
if self._planner.high_cost_state.starts_at not in [
STATE_UNKNOWN,
STATE_UNAVAILABLE,
]:
state = self._planner.high_cost_state.starts_at
_LOGGER.debug(
'Returning state "%s" of sensor "%s"',
state,
self.unique_id,
)
return state
# @property
# def extra_state_attributes(self):
# """Extra state attributes."""
# state_attributes = {
# "cost_at": STATE_UNKNOWN,
# "now_cost_rate": STATE_UNKNOWN,
# }
# # TODO: This can be made nicer to get value from states in dictionary in planner
# if self.entity_description.key == CONF_LOW_COST_STARTS_AT_ENTITY:
# state_attributes = {
# "cost_at": self._planner.low_cost_state.cost_at,
# "now_cost_rate": self._planner.low_cost_state.now_cost_rate,
# }
# _LOGGER.debug(
# 'Returning extra state attributes "%s" of sensor "%s"',
# state_attributes,
# self.unique_id,
# )
# return state_attributes
class NordpoolPlannerUsedHoursSensor(NordpoolPlannerSensor, RestoreSensor):
"""Start at specific sensor."""
async def async_added_to_hass(self) -> None:
"""Restore last state."""
await super().async_added_to_hass()
if (
(last_state := await self.async_get_last_state()) is not None
and last_state.state not in (STATE_UNKNOWN, STATE_UNAVAILABLE)
and last_state.state.isdigit()
# and (extra_data := await self.async_get_last_sensor_data()) is not None
):
self._planner.low_hours = int(last_state.state)
else:
self._planner.low_hours = 0
@property
def native_value(self):
"""Output state."""
return self._planner.low_hours
class NordpoolPlannerHealthSensor(NordpoolPlannerSensor, RestoreSensor):
"""Start at specific sensor."""
@property
def native_value(self):
"""Output state."""
return self._planner.planner_status.status.name
@property
def extra_state_attributes(self):
"""Extra state attributes."""
return {
"running_state": self._planner.planner_status.running_text,
"config_state": self._planner.planner_status.config_text,
}
@@ -0,0 +1,29 @@
{
"config": {
"step": {
"user": {
"description": "Setup a Nordpool Planner",
"data": {
"name": "Name of planner",
"type": "Planner type",
"prices_entity": "Nordpool or ENTSO-e entity",
"duration_entity": "Duration: Creates dynamic configuration parameter",
"search_length_entity": "Search length: Creates dynamic configuration parameter",
"end_time_entity": "End time: Creates dynamic configuration parameter",
"accept_cost_entity": "Accept cost: Creates a configuration parameter that turn on if cost below",
"accept_rate_entity": "Accept rate: Creates a configuration parameter that turn on if cost-rate to daily average below",
"high_cost_entity": "High cost: Creates a binary sensor that tell in it's the highest cost (inverse of normal)",
"starts_at_entity": "Starts at: Creates additional sensors telling when next lowest and highest cost starts",
"health_entity": "Adds a status entity to tell overall health of planner"
}
}
},
"error": {
"name_exists": "Name already exists",
"invalid_template": "The template is invalid"
},
"abort": {
"already_configured": "Already configured with the same settings or name"
}
}
}