Coverage for custom_components/supernotify/configuration.py: 93%
217 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
1from __future__ import annotations
3import logging
4import socket
5from pathlib import Path
6from typing import TYPE_CHECKING, Any
8from homeassistant.const import ATTR_STATE, CONF_DEVICE_ID, CONF_ENABLED, CONF_METHOD, CONF_NAME, STATE_HOME, STATE_NOT_HOME
9from homeassistant.helpers import device_registry, entity_registry
10from homeassistant.helpers.network import get_url
11from homeassistant.util import slugify
13from custom_components.supernotify.archive import NotificationArchive
14from custom_components.supernotify.common import ensure_list, safe_get
15from custom_components.supernotify.snoozer import Snoozer
17from . import (
18 ATTR_USER_ID,
19 CONF_ARCHIVE_DAYS,
20 CONF_ARCHIVE_PATH,
21 CONF_CAMERA,
22 CONF_DEVICE_NAME,
23 CONF_DEVICE_TRACKER,
24 CONF_MANUFACTURER,
25 CONF_MOBILE_DEVICES,
26 CONF_MOBILE_DISCOVERY,
27 CONF_MODEL,
28 CONF_NOTIFY_ACTION,
29 CONF_PERSON,
30 CONF_SELECTION,
31 DELIVERY_SELECTION_IMPLICIT,
32 METHOD_DEFAULTS_SCHEMA,
33 SCENARIO_DEFAULT,
34 SELECTION_DEFAULT,
35 SELECTION_FALLBACK,
36 SELECTION_FALLBACK_ON_ERROR,
37)
38from .scenario import Scenario
40if TYPE_CHECKING:
41 from homeassistant.core import HomeAssistant, State
43 from custom_components.supernotify.delivery_method import DeliveryMethod
45_LOGGER = logging.getLogger(__name__)
48class SupernotificationConfiguration:
49 def __init__(
50 self,
51 hass: HomeAssistant | None = None,
52 deliveries: dict | None = None,
53 links: list | None = None,
54 recipients: list | None = None,
55 mobile_actions: dict | None = None,
56 template_path: str | None = None,
57 media_path: str | None = None,
58 archive_config: dict[str, str] | None = None,
59 scenarios: dict[str, dict] | None = None,
60 method_defaults: dict | None = None,
61 cameras: list[dict] | None = None,
62 ) -> None:
63 self.hass: HomeAssistant | None = None
64 self.hass_internal_url: str
65 self.hass_external_url: str
66 if hass:
67 self.hass = hass
68 self.hass_name = hass.config.location_name
69 try:
70 self.hass_internal_url = get_url(hass, prefer_external=False)
71 except Exception as e:
72 _LOGGER.warning("SUPERNOTIFY could not get internal hass url: %s", e)
73 self.hass_internal_url = f"http://{socket.gethostname()}"
74 try:
75 self.hass_external_url = get_url(hass, prefer_external=True)
76 except Exception as e:
77 _LOGGER.warning("SUPERNOTIFY could not get external hass url: %s", e)
78 self.hass_external_url = self.hass_internal_url
79 else:
80 self.hass_internal_url = ""
81 self.hass_external_url = ""
82 self.hass_name = "!UNDEFINED!"
83 _LOGGER.warning("SUPERNOTIFY Configured without HomeAssistant instance")
85 _LOGGER.debug(
86 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s",
87 self.hass_name,
88 self.hass_internal_url,
89 self.hass_external_url,
90 )
92 if not self.hass_internal_url or not self.hass_internal_url.startswith("http"):
93 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.hass_internal_url)
95 self.links = ensure_list(links)
96 # raw configured deliveries
97 self._deliveries: dict[str, Any] = deliveries if isinstance(deliveries, dict) else {}
98 # validated deliveries
99 self.deliveries: dict[str, Any] = {}
100 self._recipients: list = ensure_list(recipients)
101 self.mobile_actions: dict = mobile_actions or {}
102 self.template_path: Path | None = Path(template_path) if template_path else None
103 self.media_path: Path | None = Path(media_path) if media_path else None
104 archive_config = archive_config or {}
105 self.archive: NotificationArchive = NotificationArchive(
106 archive_config.get(CONF_ARCHIVE_PATH), archive_config.get(CONF_ARCHIVE_DAYS)
107 )
108 self.cameras: dict[str, Any] = {c[CONF_CAMERA]: c for c in cameras} if cameras else {}
109 self.methods: dict[str, DeliveryMethod] = {}
110 self.method_defaults: dict = method_defaults or {}
111 self.scenarios: dict[str, Scenario] = {}
112 self.people: dict[str, dict] = {}
113 self.applied_scenarios: dict = scenarios or {}
114 self.delivery_by_scenario: dict[str, list] = {SCENARIO_DEFAULT: []}
115 self.fallback_on_error: dict = {}
116 self.fallback_by_default: dict = {}
117 self._entity_registry: entity_registry.EntityRegistry | None = None
118 self._device_registry: device_registry.DeviceRegistry | None = None
119 self.snoozer = Snoozer()
121 async def initialize(self) -> None:
122 self.people = self.setup_people(self._recipients)
124 if self.applied_scenarios and self.hass:
125 for scenario_name, scenario_definition in self.applied_scenarios.items():
126 scenario = Scenario(scenario_name, scenario_definition, self.hass)
127 if await scenario.validate():
128 self.scenarios[scenario_name] = scenario
130 if self.template_path and not self.template_path.exists():
131 _LOGGER.warning("SUPERNOTIFY template path not found at %s", self.template_path)
132 self.template_path = None
134 if self.media_path and not self.media_path.exists():
135 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path)
136 try:
137 self.media_path.mkdir(parents=True, exist_ok=True)
138 except Exception as e:
139 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e)
140 self.media_path = None
141 if self.media_path is not None:
142 _LOGGER.info("SUPERNOTIFY abs media path: %s", self.media_path.absolute())
143 if self.archive:
144 self.archive.initialize()
145 default_deliveries: dict = self.initialize_deliveries()
146 self.initialize_scenarios(default_deliveries)
148 def initialize_deliveries(self) -> dict:
149 default_deliveries = {}
150 if self._deliveries:
151 for d, dc in self._deliveries.items():
152 if dc.get(CONF_ENABLED, True):
153 if SELECTION_FALLBACK_ON_ERROR in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
154 self.fallback_on_error[d] = dc
155 if SELECTION_FALLBACK in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
156 self.fallback_by_default[d] = dc
157 if SELECTION_DEFAULT in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
158 default_deliveries[d] = dc
160 if not dc.get(CONF_NAME):
161 dc[CONF_NAME] = d # for minimal tests
162 for conf_key in METHOD_DEFAULTS_SCHEMA.schema:
163 self.set_method_default(dc, conf_key.schema)
164 return default_deliveries
166 def initialize_scenarios(self, default_deliveries: dict) -> None:
167 for scenario_name, scenario in self.scenarios.items():
168 self.delivery_by_scenario.setdefault(scenario_name, [])
169 if scenario.delivery_selection == DELIVERY_SELECTION_IMPLICIT:
170 scenario_deliveries: list[str] = list(default_deliveries.keys())
171 else:
172 scenario_deliveries = []
173 scenario_definition_delivery = scenario.delivery
174 scenario_deliveries.extend(s for s in scenario_definition_delivery if s not in scenario_deliveries)
176 for scenario_delivery in scenario_deliveries:
177 if safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_ENABLED, True):
178 self.delivery_by_scenario[scenario_name].append(scenario_delivery)
180 self.delivery_by_scenario[SCENARIO_DEFAULT] = list(default_deliveries.keys())
182 async def register_delivery_methods(
183 self,
184 delivery_methods: list[DeliveryMethod] | None = None,
185 delivery_method_classes: list[type[DeliveryMethod]] | None = None,
186 set_as_default: bool = False,
187 ) -> None:
188 """Available directly for test fixtures supplying class or instance"""
189 if delivery_methods:
190 for delivery_method in delivery_methods:
191 self.methods[delivery_method.method] = delivery_method
192 await self.methods[delivery_method.method].initialize()
193 self.deliveries.update(self.methods[delivery_method.method].valid_deliveries)
194 if delivery_method_classes and self.hass:
195 for delivery_method_class in delivery_method_classes:
196 self.methods[delivery_method_class.method] = delivery_method_class(self.hass, self, self._deliveries)
197 await self.methods[delivery_method_class.method].initialize()
198 self.deliveries.update(self.methods[delivery_method_class.method].valid_deliveries)
200 for d, dc in self.deliveries.items():
201 if dc.get(CONF_METHOD) not in self.methods:
202 _LOGGER.info("SUPERNOTIFY Ignoring delivery %s without known method %s", d, dc.get(CONF_METHOD))
203 elif set_as_default and d not in self.delivery_by_scenario[SCENARIO_DEFAULT]:
204 self.delivery_by_scenario[SCENARIO_DEFAULT].append(d)
206 _LOGGER.info("SUPERNOTIFY configured deliveries %s", "; ".join(self.deliveries.keys()))
208 def set_method_default(self, delivery_config: dict[str, Any], attr: str) -> None:
209 if attr not in delivery_config:
210 method_default: dict = self.method_defaults.get(delivery_config.get(CONF_METHOD), {})
211 if method_default.get(attr):
212 delivery_config[attr] = method_default[attr]
213 _LOGGER.debug(
214 "SUPERNOTIFY Defaulting delivery %s to %s %s", delivery_config[CONF_NAME], attr, delivery_config[attr]
215 )
217 def delivery_method(self, delivery: str) -> DeliveryMethod:
218 method_name = self.deliveries.get(delivery, {}).get(CONF_METHOD)
219 method: DeliveryMethod | None = self.methods.get(method_name)
220 if not method:
221 raise ValueError(f"SUPERNOTIFY No method for delivery {delivery}")
222 return method
224 def setup_people(self, recipients: list[dict] | tuple[dict]) -> dict[str, dict]:
225 people: dict[str, dict] = {}
226 for r in recipients:
227 if r.get(CONF_MOBILE_DISCOVERY):
228 r[CONF_MOBILE_DEVICES].extend(self.mobile_devices_for_person(r[CONF_PERSON]))
229 if r.get(CONF_MOBILE_DEVICES):
230 _LOGGER.info("SUPERNOTIFY Auto configured %s for mobile devices %s", r[CONF_PERSON], r[CONF_MOBILE_DEVICES])
231 else:
232 _LOGGER.warning("SUPERNOTIFY Unable to find mobile devices for %s", r[CONF_PERSON])
233 if self.hass:
234 state: State | None = self.hass.states.get(r[CONF_PERSON])
235 if state is not None:
236 r[ATTR_USER_ID] = state.attributes.get(ATTR_USER_ID)
237 people[r[CONF_PERSON]] = r
238 return people
240 def people_state(self) -> list[dict]:
241 results = []
242 if self.hass:
243 for person, person_config in self.people.items():
244 # TODO: possibly rate limit this
245 try:
246 tracker = self.hass.states.get(person)
247 if tracker is None:
248 person_config[ATTR_STATE] = None
249 else:
250 person_config[ATTR_STATE] = tracker.state
251 except Exception as e:
252 _LOGGER.warning("Unable to determine occupied status for %s: %s", person, e)
253 results.append(person_config)
254 return results
256 def determine_occupancy(self) -> dict[str, list[dict]]:
257 results: dict[str, list[dict]] = {STATE_HOME: [], STATE_NOT_HOME: []}
258 for person_config in self.people_state():
259 if person_config.get(ATTR_STATE) in (None, STATE_HOME):
260 # default to at home if unknown tracker
261 results[STATE_HOME].append(person_config)
262 else:
263 results[STATE_NOT_HOME].append(person_config)
264 return results
266 def entity_registry(self) -> entity_registry.EntityRegistry | None:
267 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing
268 to all entities, so do it once here
269 """ # noqa: D205
270 if self._entity_registry is not None:
271 return self._entity_registry
272 if self.hass:
273 try:
274 self._entity_registry = entity_registry.async_get(self.hass)
275 except Exception as e:
276 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e)
277 return self._entity_registry
279 def device_registry(self) -> device_registry.DeviceRegistry | None:
280 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing
281 to all devices, so do it once here
282 """ # noqa: D205
283 if self._device_registry is not None:
284 return self._device_registry
285 if self.hass:
286 try:
287 self._device_registry = device_registry.async_get(self.hass)
288 except Exception as e:
289 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e)
290 return self._device_registry
292 def mobile_devices_for_person(self, person_entity_id: str, validate_targets: bool = False) -> list:
293 """Auto detect mobile_app targets for a person.
295 Targets not currently validated as async registration may not be complete at this stage
297 Args:
298 ----
299 person_entity_id (str): _description_
300 validate_targets (bool, optional): _description_. Defaults to False.
302 Returns:
303 -------
304 list: mobile target actions for this person
306 """
307 mobile_devices = []
308 person_state = self.hass.states.get(person_entity_id) if self.hass else None
309 if not person_state:
310 _LOGGER.warning("SUPERNOTIFY Unable to resolve %s", person_entity_id)
311 else:
312 ent_reg = self.entity_registry()
313 dev_reg = self.device_registry()
314 if not ent_reg or not dev_reg:
315 _LOGGER.warning("SUPERNOTIFY Unable to access entity or device registries for %s", person_entity_id)
316 else:
317 for d_t in person_state.attributes.get("device_trackers", ()):
318 entity = ent_reg.async_get(d_t)
319 if entity and entity.platform == "mobile_app" and entity.device_id:
320 device = dev_reg.async_get(entity.device_id)
321 if not device:
322 _LOGGER.warning("SUPERNOTIFY Unable to find device %s", entity.device_id)
323 else:
324 notify_action = f"mobile_app_{slugify(device.name)}"
325 if (
326 validate_targets
327 and self.hass
328 and self.hass.services
329 and not self.hass.services.has_service("notify", notify_action)
330 ):
331 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", notify_action)
332 else:
333 mobile_devices.append({
334 CONF_MANUFACTURER: device.manufacturer,
335 CONF_MODEL: device.model,
336 CONF_NOTIFY_ACTION: notify_action,
337 CONF_DEVICE_TRACKER: d_t,
338 CONF_DEVICE_ID: device.id,
339 CONF_DEVICE_NAME: device.name,
340 # CONF_DEVICE_LABELS: device.labels,
341 })
342 else:
343 _LOGGER.debug("SUPERNOTIFY Ignoring device tracker %s", d_t)
345 return mobile_devices