Coverage for custom_components/supernotify/configuration.py: 93%
260 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-26 08:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-26 08:54 +0000
1from __future__ import annotations
3import logging
4import socket
5from pathlib import Path
6from typing import TYPE_CHECKING, Any
8from homeassistant.const import (
9 ATTR_STATE,
10 CONF_DEFAULT,
11 CONF_DEVICE_ID,
12 CONF_ENABLED,
13 CONF_METHOD,
14 CONF_NAME,
15 STATE_HOME,
16 STATE_NOT_HOME,
17)
18from homeassistant.helpers import device_registry, entity_registry
19from homeassistant.helpers import issue_registry as ir
20from homeassistant.helpers.config_validation import boolean
21from homeassistant.helpers.network import get_url
22from homeassistant.util import slugify
24from custom_components.supernotify.archive import ArchiveTopic, NotificationArchive
25from custom_components.supernotify.common import ensure_list, safe_get
26from custom_components.supernotify.snoozer import Snoozer
28from . import (
29 ATTR_USER_ID,
30 CONF_ARCHIVE_DAYS,
31 CONF_ARCHIVE_MQTT_QOS,
32 CONF_ARCHIVE_MQTT_RETAIN,
33 CONF_ARCHIVE_MQTT_TOPIC,
34 CONF_ARCHIVE_PATH,
35 CONF_CAMERA,
36 CONF_DATA,
37 CONF_DEVICE_DISCOVERY,
38 CONF_DEVICE_DOMAIN,
39 CONF_DEVICE_NAME,
40 CONF_DEVICE_TRACKER,
41 CONF_MANUFACTURER,
42 CONF_MOBILE_DEVICES,
43 CONF_MOBILE_DISCOVERY,
44 CONF_MODEL,
45 CONF_NOTIFY_ACTION,
46 CONF_PERSON,
47 CONF_SELECTION,
48 CONF_TARGETS_REQUIRED,
49 DELIVERY_SELECTION_IMPLICIT,
50 DOMAIN,
51 SCENARIO_DEFAULT,
52 SCENARIO_TEMPLATE_ATTRS,
53 SELECTION_DEFAULT,
54 SELECTION_FALLBACK,
55 SELECTION_FALLBACK_ON_ERROR,
56)
57from .scenario import Scenario
59if TYPE_CHECKING:
60 from homeassistant.core import HomeAssistant, State
61 from homeassistant.helpers.device_registry import DeviceEntry, DeviceRegistry
63 from custom_components.supernotify.delivery_method import DeliveryMethod
65_LOGGER = logging.getLogger(__name__)
68class Context:
69 def __init__(
70 self,
71 hass: HomeAssistant | None = None,
72 deliveries: dict[str, Any] | None = None,
73 links: list[str] | None = None,
74 recipients: list[dict[str, Any]] | None = None,
75 mobile_actions: dict[str, Any] | None = None,
76 template_path: str | None = None,
77 media_path: str | None = None,
78 archive_config: dict[str, str] | None = None,
79 scenarios: dict[str, dict[str, Any]] | None = None,
80 method_configs: dict[str, Any] | None = None,
81 cameras: list[dict[str, Any]] | None = None,
82 method_types: list[type[DeliveryMethod]] | None = None,
83 ) -> None:
84 self.hass: HomeAssistant | None = None
85 self.hass_internal_url: str
86 self.hass_external_url: str
87 if hass:
88 self.hass = hass
89 self.hass_name = hass.config.location_name
90 try:
91 self.hass_internal_url = get_url(hass, prefer_external=False)
92 except Exception as e:
93 self.hass_internal_url = f"http://{socket.gethostname()}"
94 _LOGGER.warning("SUPERNOTIFY could not get internal hass url, defaulting to %s: %s", self.hass_internal_url, e)
95 try:
96 self.hass_external_url = get_url(hass, prefer_external=True)
97 except Exception as e:
98 _LOGGER.warning("SUPERNOTIFY could not get external hass url, defaulting to internal url: %s", e)
99 self.hass_external_url = self.hass_internal_url
100 else:
101 self.hass_internal_url = ""
102 self.hass_external_url = ""
103 self.hass_name = "!UNDEFINED!"
104 _LOGGER.warning("SUPERNOTIFY Configured without HomeAssistant instance")
106 _LOGGER.debug(
107 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s",
108 self.hass_name,
109 self.hass_internal_url,
110 self.hass_external_url,
111 )
113 if not self.hass_internal_url or not self.hass_internal_url.startswith("http"):
114 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.hass_internal_url)
116 self.links: list[dict[str, Any]] = ensure_list(links)
117 # raw configured deliveries
118 self._deliveries: dict[str, Any] = deliveries if isinstance(deliveries, dict) else {}
119 # validated deliveries
120 self.deliveries: dict[str, Any] = {}
121 self._recipients: list[dict[str, Any]] = ensure_list(recipients)
122 self.mobile_actions: dict[str, Any] = mobile_actions or {}
123 self.template_path: Path | None = Path(template_path) if template_path else None
124 self.media_path: Path | None = Path(media_path) if media_path else None
125 archive_config = archive_config or {}
126 self.archive: NotificationArchive = NotificationArchive(
127 bool(archive_config.get(CONF_ENABLED, False)),
128 archive_config.get(CONF_ARCHIVE_PATH),
129 archive_config.get(CONF_ARCHIVE_DAYS),
130 )
131 archive_topic = archive_config.get(CONF_ARCHIVE_MQTT_TOPIC)
132 self.archive_topic: ArchiveTopic | None = None
133 if archive_topic is not None and self.hass:
134 self.archive_topic = ArchiveTopic(
135 self.hass,
136 archive_topic,
137 int(archive_config.get(CONF_ARCHIVE_MQTT_QOS, 0)),
138 boolean(archive_config.get(CONF_ARCHIVE_MQTT_RETAIN, True)),
139 )
140 else:
141 self.archive_topic = None
142 self.cameras: dict[str, Any] = {c[CONF_CAMERA]: c for c in cameras} if cameras else {}
143 self.methods: dict[str, DeliveryMethod] = {}
144 self._method_configs: dict[str, Any] = method_configs or {}
145 self.scenarios: dict[str, Scenario] = {}
146 self.people: dict[str, dict[str, Any]] = {}
147 self._config_scenarios: dict[str, Any] = scenarios or {}
148 self.content_scenario_templates: dict[str, Any] = {}
149 self.delivery_by_scenario: dict[str, list[str]] = {SCENARIO_DEFAULT: []}
150 self.fallback_on_error: dict[str, dict[str, Any]] = {}
151 self.fallback_by_default: dict[str, dict[str, Any]] = {}
152 self._entity_registry: entity_registry.EntityRegistry | None = None
153 self._device_registry: device_registry.DeviceRegistry | None = None
154 self._method_types: list[type[DeliveryMethod]] = method_types or []
155 self.snoozer = Snoozer()
156 # test harness support
157 self._create_default_scenario: bool = False
158 self._method_instances: list[DeliveryMethod] | None = None
160 async def initialize(self) -> None:
161 await self._register_delivery_methods(
162 delivery_methods=self._method_instances, delivery_method_classes=self._method_types
163 )
165 self.people = self.setup_people(self._recipients)
167 if self._config_scenarios and self.hass:
168 for scenario_name, scenario_definition in self._config_scenarios.items():
169 scenario = Scenario(scenario_name, scenario_definition, self.hass)
170 if await scenario.validate(
171 valid_deliveries=list(self.deliveries), valid_action_groups=list(self.mobile_actions)
172 ):
173 self.scenarios[scenario_name] = scenario
175 if self.template_path and not self.template_path.exists():
176 _LOGGER.warning("SUPERNOTIFY template path not found at %s", self.template_path)
177 self.template_path = None
179 if self.media_path and not self.media_path.exists():
180 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path)
181 try:
182 self.media_path.mkdir(parents=True, exist_ok=True)
183 except Exception as e:
184 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e)
185 self.raise_issue("media_path", "media_path", {"path": str(self.media_path), "error": str(e)})
186 self.media_path = None
187 if self.media_path is not None:
188 _LOGGER.info("SUPERNOTIFY abs media path: %s", self.media_path.absolute())
189 if self.archive:
190 self.archive.initialize()
191 default_deliveries: dict[str, Any] = self.initialize_deliveries()
192 self.initialize_scenarios(default_deliveries, default_scenario=self._create_default_scenario)
194 def configure_for_tests(
195 self, method_instances: list[DeliveryMethod] | None = None, create_default_scenario: bool = False
196 ) -> None:
197 self._create_default_scenario = create_default_scenario
198 self._method_instances = method_instances
200 def raise_issue(
201 self,
202 issue_id: str,
203 issue_key: str,
204 issue_map: dict[str, str],
205 severity: ir.IssueSeverity = ir.IssueSeverity.WARNING,
206 learn_more_url: str = "https://jeyrb.github.io/hass_supernotify",
207 ) -> None:
208 if not self.hass:
209 return
210 ir.async_create_issue(
211 self.hass,
212 DOMAIN,
213 issue_id,
214 is_fixable=False,
215 translation_key=issue_key,
216 translation_placeholders=issue_map,
217 severity=severity,
218 learn_more_url=learn_more_url,
219 )
221 def initialize_deliveries(self) -> dict[str, Any]:
222 default_deliveries = {}
223 if self._deliveries:
224 for d, dc in self._deliveries.items():
225 method = self.methods.get(dc[CONF_METHOD])
226 if method:
227 for k, v in method.default.items():
228 dc.setdefault(k, v)
229 else:
230 _LOGGER.warning(f"SUPERNOTIFY Unknown method {dc[CONF_METHOD]} for delivery {d}")
231 self.raise_issue(
232 f"delivery_{d}_unknown_method{dc[CONF_METHOD]}",
233 issue_key="delivery_unknown_method",
234 issue_map={"delivery": d, "method": dc[CONF_METHOD]},
235 )
236 dc[CONF_ENABLED] = False
237 if dc.get(CONF_ENABLED, True):
238 if SELECTION_FALLBACK_ON_ERROR in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
239 self.fallback_on_error[d] = dc
240 if SELECTION_FALLBACK in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
241 self.fallback_by_default[d] = dc
242 if SELECTION_DEFAULT in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
243 default_deliveries[d] = dc
245 if not dc.get(CONF_NAME):
246 dc[CONF_NAME] = d # for minimal tests
248 return default_deliveries
250 def initialize_scenarios(self, default_deliveries: dict[str, Any], default_scenario: bool = False) -> None:
251 for scenario_name, scenario in self.scenarios.items():
252 self.delivery_by_scenario.setdefault(scenario_name, [])
253 if scenario.delivery_selection == DELIVERY_SELECTION_IMPLICIT:
254 scenario_deliveries: list[str] = list(default_deliveries.keys())
255 else:
256 scenario_deliveries = []
257 scenario_definition_delivery = scenario.delivery
258 scenario_deliveries.extend(s for s in scenario_definition_delivery if s not in scenario_deliveries)
260 for scenario_delivery in scenario_deliveries:
261 if safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_ENABLED, True):
262 self.delivery_by_scenario[scenario_name].append(scenario_delivery)
264 scenario_delivery_config = safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_DATA, {})
266 # extract message and title templates per scenario per delivery
267 for template_field in SCENARIO_TEMPLATE_ATTRS:
268 template_format = scenario_delivery_config.get(template_field)
269 if template_format is not None:
270 self.content_scenario_templates.setdefault(template_field, {})
271 self.content_scenario_templates[template_field].setdefault(scenario_delivery, [])
272 self.content_scenario_templates[template_field][scenario_delivery].append(scenario_name)
274 self.delivery_by_scenario[SCENARIO_DEFAULT] = list(default_deliveries.keys())
275 if default_scenario:
276 for d, dc in self.deliveries.items():
277 if dc.get(CONF_ENABLED, True) and d not in self.delivery_by_scenario[SCENARIO_DEFAULT]:
278 self.delivery_by_scenario[SCENARIO_DEFAULT].append(d)
280 async def _register_delivery_methods(
281 self,
282 delivery_methods: list[DeliveryMethod] | None = None,
283 delivery_method_classes: list[type[DeliveryMethod]] | None = None,
284 ) -> None:
285 """Use configure_for_tests() to set delivery_methods to mocks or manually created fixtures"""
286 if delivery_methods:
287 for delivery_method in delivery_methods:
288 self.methods[delivery_method.method] = delivery_method
289 await self.methods[delivery_method.method].initialize()
290 self.deliveries.update(self.methods[delivery_method.method].valid_deliveries)
291 if delivery_method_classes and self.hass:
292 for delivery_method_class in delivery_method_classes:
293 method_config = self._method_configs.get(delivery_method_class.method, {})
294 self.methods[delivery_method_class.method] = delivery_method_class(
295 self.hass,
296 self,
297 self._deliveries,
298 default=method_config.get(CONF_DEFAULT, {}),
299 device_domain=method_config.get(CONF_DEVICE_DOMAIN, []),
300 device_discovery=method_config.get(CONF_DEVICE_DISCOVERY, False),
301 targets_required=method_config.get(CONF_TARGETS_REQUIRED, False),
302 )
303 await self.methods[delivery_method_class.method].initialize()
304 self.deliveries.update(self.methods[delivery_method_class.method].valid_deliveries)
306 _LOGGER.info("SUPERNOTIFY configured deliveries %s", "; ".join(self.deliveries.keys()))
308 def delivery_method(self, delivery: str) -> DeliveryMethod:
309 method_name = self.deliveries.get(delivery, {}).get(CONF_METHOD)
310 method: DeliveryMethod | None = self.methods.get(method_name)
311 if not method:
312 raise ValueError(f"SUPERNOTIFY No method {method_name} for delivery {delivery}")
313 return method
315 def discover_devices(self, discover_domain: str) -> list[DeviceEntry]:
316 devices: list[DeviceEntry] = []
317 dev_reg: DeviceRegistry | None = self.device_registry()
318 if dev_reg is None:
319 _LOGGER.warning(f"SUPERNOTIFY Unable to discover devices for {discover_domain} - no device registry found")
320 return []
322 all_devs = enabled_devs = found_devs = 0
323 for dev in dev_reg.devices.values():
324 all_devs += 1
325 if not dev.disabled:
326 enabled_devs += 1
327 for identifier in dev.identifiers:
328 if identifier and len(identifier) > 1 and identifier[0] == discover_domain:
329 _LOGGER.debug("SUPERNOTIFY discovered device %s for identifier %s", dev.name, identifier)
330 devices.append(dev)
331 found_devs += 1
332 elif identifier and len(identifier) != 2:
333 # HomeKit has triples for identifiers, other domains may behave similarly
334 _LOGGER.debug("SUPERNOTIFY Unexpected device %s identifier: %s", dev.name, identifier) # type: ignore
335 _LOGGER.info(
336 f"SUPERNOTIFY {discover_domain} device discovery, all={all_devs}, enabled={enabled_devs}, found={found_devs}"
337 )
338 return devices
340 def setup_people(self, recipients: list[dict[str, Any]] | tuple[dict[str, Any]]) -> dict[str, dict[str, Any]]:
341 people: dict[str, dict[str, Any]] = {}
342 for r in recipients:
343 if r.get(CONF_MOBILE_DISCOVERY):
344 r[CONF_MOBILE_DEVICES].extend(self.mobile_devices_for_person(r[CONF_PERSON]))
345 if r.get(CONF_MOBILE_DEVICES):
346 _LOGGER.info("SUPERNOTIFY Auto configured %s for mobile devices %s", r[CONF_PERSON], r[CONF_MOBILE_DEVICES])
347 else:
348 _LOGGER.warning("SUPERNOTIFY Unable to find mobile devices for %s", r[CONF_PERSON])
349 if self.hass:
350 state: State | None = self.hass.states.get(r[CONF_PERSON])
351 if state is not None:
352 r[ATTR_USER_ID] = state.attributes.get(ATTR_USER_ID)
353 people[r[CONF_PERSON]] = r
354 return people
356 def people_state(self) -> list[dict[str, Any]]:
357 results = []
358 if self.hass:
359 for person, person_config in self.people.items():
360 # TODO: possibly rate limit this
361 try:
362 tracker = self.hass.states.get(person)
363 if tracker is None:
364 person_config[ATTR_STATE] = None
365 else:
366 person_config[ATTR_STATE] = tracker.state
367 except Exception as e:
368 _LOGGER.warning("SUPERNOTIFY Unable to determine occupied status for %s: %s", person, e)
369 results.append(person_config)
370 return results
372 def determine_occupancy(self) -> dict[str, list[dict[str, Any]]]:
373 results: dict[str, list[dict[str, Any]]] = {STATE_HOME: [], STATE_NOT_HOME: []}
374 for person_config in self.people_state():
375 if person_config.get(ATTR_STATE) in (None, STATE_HOME):
376 # default to at home if unknown tracker
377 results[STATE_HOME].append(person_config)
378 else:
379 results[STATE_NOT_HOME].append(person_config)
380 return results
382 def entity_registry(self) -> entity_registry.EntityRegistry | None:
383 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing
384 to all entities, so do it once here
385 """ # noqa: D205
386 if self._entity_registry is not None:
387 return self._entity_registry
388 if self.hass:
389 try:
390 self._entity_registry = entity_registry.async_get(self.hass)
391 except Exception as e:
392 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e)
393 return self._entity_registry
395 def device_registry(self) -> device_registry.DeviceRegistry | None:
396 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing
397 to all devices, so do it once here
398 """ # noqa: D205
399 if self._device_registry is not None:
400 return self._device_registry
401 if self.hass:
402 try:
403 self._device_registry = device_registry.async_get(self.hass)
404 except Exception as e:
405 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e)
406 return self._device_registry
408 def mobile_devices_for_person(self, person_entity_id: str, validate_targets: bool = False) -> list[dict[str, Any]]:
409 """Auto detect mobile_app targets for a person.
411 Targets not currently validated as async registration may not be complete at this stage
413 Args:
414 ----
415 person_entity_id (str): _description_
416 validate_targets (bool, optional): _description_. Defaults to False.
418 Returns:
419 -------
420 list: mobile target actions for this person
422 """
423 mobile_devices = []
424 person_state = self.hass.states.get(person_entity_id) if self.hass else None
425 if not person_state:
426 _LOGGER.warning("SUPERNOTIFY Unable to resolve %s", person_entity_id)
427 else:
428 ent_reg = self.entity_registry()
429 dev_reg = self.device_registry()
430 if not ent_reg or not dev_reg:
431 _LOGGER.warning("SUPERNOTIFY Unable to access entity or device registries for %s", person_entity_id)
432 else:
433 for d_t in person_state.attributes.get("device_trackers", ()):
434 entity = ent_reg.async_get(d_t)
435 if entity and entity.platform == "mobile_app" and entity.device_id:
436 device = dev_reg.async_get(entity.device_id)
437 if not device:
438 _LOGGER.warning("SUPERNOTIFY Unable to find device %s", entity.device_id)
439 else:
440 notify_action = f"mobile_app_{slugify(device.name)}"
441 if (
442 validate_targets
443 and self.hass
444 and self.hass.services
445 and not self.hass.services.has_service("notify", notify_action)
446 ):
447 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", notify_action)
448 else:
449 mobile_devices.append({
450 CONF_MANUFACTURER: device.manufacturer,
451 CONF_MODEL: device.model,
452 CONF_NOTIFY_ACTION: notify_action,
453 CONF_DEVICE_TRACKER: d_t,
454 CONF_DEVICE_ID: device.id,
455 CONF_DEVICE_NAME: device.name,
456 # CONF_DEVICE_LABELS: device.labels,
457 })
458 else:
459 _LOGGER.debug("SUPERNOTIFY Ignoring device tracker %s", d_t)
461 return mobile_devices