Coverage for custom_components/supernotify/configuration.py: 94%
228 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-18 09:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-18 09:29 +0000
1from __future__ import annotations
3import logging
4import socket
5from pathlib import Path
6from typing import TYPE_CHECKING, Any
8from homeassistant.const import (
9 ATTR_STATE,
10 CONF_DEVICE_ID,
11 CONF_ENABLED,
12 CONF_METHOD,
13 CONF_NAME,
14 STATE_HOME,
15 STATE_NOT_HOME,
16)
17from homeassistant.helpers import device_registry, entity_registry
18from homeassistant.helpers.config_validation import boolean
19from homeassistant.helpers.network import get_url
20from homeassistant.util import slugify
22from custom_components.supernotify.archive import ArchiveTopic, NotificationArchive
23from custom_components.supernotify.common import ensure_list, safe_get
24from custom_components.supernotify.snoozer import Snoozer
26from . import (
27 ATTR_USER_ID,
28 CONF_ARCHIVE_DAYS,
29 CONF_ARCHIVE_MQTT_QOS,
30 CONF_ARCHIVE_MQTT_RETAIN,
31 CONF_ARCHIVE_MQTT_TOPIC,
32 CONF_ARCHIVE_PATH,
33 CONF_CAMERA,
34 CONF_DATA,
35 CONF_DEVICE_NAME,
36 CONF_DEVICE_TRACKER,
37 CONF_MANUFACTURER,
38 CONF_MOBILE_DEVICES,
39 CONF_MOBILE_DISCOVERY,
40 CONF_MODEL,
41 CONF_NOTIFY_ACTION,
42 CONF_PERSON,
43 CONF_SELECTION,
44 DELIVERY_SELECTION_IMPLICIT,
45 METHOD_DEFAULTS_SCHEMA,
46 SCENARIO_DEFAULT,
47 SCENARIO_TEMPLATE_ATTRS,
48 SELECTION_DEFAULT,
49 SELECTION_FALLBACK,
50 SELECTION_FALLBACK_ON_ERROR,
51)
52from .scenario import Scenario
54if TYPE_CHECKING:
55 from homeassistant.core import HomeAssistant, State
57 from custom_components.supernotify.delivery_method import DeliveryMethod
59_LOGGER = logging.getLogger(__name__)
62class Context:
63 def __init__(
64 self,
65 hass: HomeAssistant | None = None,
66 deliveries: dict[str, Any] | None = None,
67 links: list[str] | None = None,
68 recipients: list[dict[str, Any]] | None = None,
69 mobile_actions: dict[str, Any] | None = None,
70 template_path: str | None = None,
71 media_path: str | None = None,
72 archive_config: dict[str, str] | None = None,
73 scenarios: dict[str, dict[str, Any]] | None = None,
74 method_defaults: dict[str, Any] | None = None,
75 cameras: list[dict[str, Any]] | None = None,
76 ) -> None:
77 self.hass: HomeAssistant | None = None
78 self.hass_internal_url: str
79 self.hass_external_url: str
80 if hass:
81 self.hass = hass
82 self.hass_name = hass.config.location_name
83 try:
84 self.hass_internal_url = get_url(hass, prefer_external=False)
85 except Exception as e:
86 _LOGGER.warning("SUPERNOTIFY could not get internal hass url: %s", e)
87 self.hass_internal_url = f"http://{socket.gethostname()}"
88 try:
89 self.hass_external_url = get_url(hass, prefer_external=True)
90 except Exception as e:
91 _LOGGER.warning("SUPERNOTIFY could not get external hass url: %s", e)
92 self.hass_external_url = self.hass_internal_url
93 else:
94 self.hass_internal_url = ""
95 self.hass_external_url = ""
96 self.hass_name = "!UNDEFINED!"
97 _LOGGER.warning("SUPERNOTIFY Configured without HomeAssistant instance")
99 _LOGGER.debug(
100 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s",
101 self.hass_name,
102 self.hass_internal_url,
103 self.hass_external_url,
104 )
106 if not self.hass_internal_url or not self.hass_internal_url.startswith("http"):
107 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.hass_internal_url)
109 self.links: list[dict[str, Any]] = ensure_list(links)
110 # raw configured deliveries
111 self._deliveries: dict[str, Any] = deliveries if isinstance(deliveries, dict) else {}
112 # validated deliveries
113 self.deliveries: dict[str, Any] = {}
114 self._recipients: list[dict[str, Any]] = ensure_list(recipients)
115 self.mobile_actions: dict[str, Any] = mobile_actions or {}
116 self.template_path: Path | None = Path(template_path) if template_path else None
117 self.media_path: Path | None = Path(media_path) if media_path else None
118 archive_config = archive_config or {}
119 self.archive: NotificationArchive = NotificationArchive(
120 bool(archive_config.get(CONF_ENABLED, False)),
121 archive_config.get(CONF_ARCHIVE_PATH),
122 archive_config.get(CONF_ARCHIVE_DAYS),
123 )
124 archive_topic = archive_config.get(CONF_ARCHIVE_MQTT_TOPIC)
125 self.archive_topic: ArchiveTopic | None = None
126 if archive_topic is not None and self.hass:
127 self.archive_topic = ArchiveTopic(
128 self.hass,
129 archive_topic,
130 int(archive_config.get(CONF_ARCHIVE_MQTT_QOS, 0)),
131 boolean(archive_config.get(CONF_ARCHIVE_MQTT_RETAIN, True)),
132 )
133 else:
134 self.archive_topic = None
135 self.cameras: dict[str, Any] = {c[CONF_CAMERA]: c for c in cameras} if cameras else {}
136 self.methods: dict[str, DeliveryMethod] = {}
137 self.method_defaults: dict[str, Any] = method_defaults or {}
138 self.scenarios: dict[str, Scenario] = {}
139 self.people: dict[str, dict[str, Any]] = {}
140 self._config_scenarios: dict[str, Any] = scenarios or {}
141 self.content_scenario_templates: dict[str, Any] = {}
142 self.delivery_by_scenario: dict[str, list[str]] = {SCENARIO_DEFAULT: []}
143 self.fallback_on_error: dict[str, dict[str, Any]] = {}
144 self.fallback_by_default: dict[str, dict[str, Any]] = {}
145 self._entity_registry: entity_registry.EntityRegistry | None = None
146 self._device_registry: device_registry.DeviceRegistry | None = None
147 self.snoozer = Snoozer()
149 async def initialize(self) -> None:
150 self.people = self.setup_people(self._recipients)
152 if self._config_scenarios and self.hass:
153 for scenario_name, scenario_definition in self._config_scenarios.items():
154 scenario = Scenario(scenario_name, scenario_definition, self.hass)
155 if await scenario.validate():
156 self.scenarios[scenario_name] = scenario
158 if self.template_path and not self.template_path.exists():
159 _LOGGER.warning("SUPERNOTIFY template path not found at %s", self.template_path)
160 self.template_path = None
162 if self.media_path and not self.media_path.exists():
163 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path)
164 try:
165 self.media_path.mkdir(parents=True, exist_ok=True)
166 except Exception as e:
167 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e)
168 self.media_path = None
169 if self.media_path is not None:
170 _LOGGER.info("SUPERNOTIFY abs media path: %s", self.media_path.absolute())
171 if self.archive:
172 self.archive.initialize()
173 default_deliveries: dict[str, Any] = self.initialize_deliveries()
174 self.initialize_scenarios(default_deliveries)
176 def initialize_deliveries(self) -> dict[str, Any]:
177 default_deliveries = {}
178 if self._deliveries:
179 for d, dc in self._deliveries.items():
180 if dc.get(CONF_ENABLED, True):
181 if SELECTION_FALLBACK_ON_ERROR in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
182 self.fallback_on_error[d] = dc
183 if SELECTION_FALLBACK in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
184 self.fallback_by_default[d] = dc
185 if SELECTION_DEFAULT in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]):
186 default_deliveries[d] = dc
188 if not dc.get(CONF_NAME):
189 dc[CONF_NAME] = d # for minimal tests
190 for conf_key in METHOD_DEFAULTS_SCHEMA.schema:
191 self.set_method_default(dc, conf_key.schema)
192 return default_deliveries
194 def initialize_scenarios(self, default_deliveries: dict[str, Any]) -> None:
195 for scenario_name, scenario in self.scenarios.items():
196 self.delivery_by_scenario.setdefault(scenario_name, [])
197 if scenario.delivery_selection == DELIVERY_SELECTION_IMPLICIT:
198 scenario_deliveries: list[str] = list(default_deliveries.keys())
199 else:
200 scenario_deliveries = []
201 scenario_definition_delivery = scenario.delivery
202 scenario_deliveries.extend(s for s in scenario_definition_delivery if s not in scenario_deliveries)
204 for scenario_delivery in scenario_deliveries:
205 if safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_ENABLED, True):
206 self.delivery_by_scenario[scenario_name].append(scenario_delivery)
208 scenario_delivery_config = safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_DATA, {})
210 # extract message and title templates per scenario per delivery
211 for template_field in SCENARIO_TEMPLATE_ATTRS:
212 template_format = scenario_delivery_config.get(template_field)
213 if template_format is not None:
214 self.content_scenario_templates.setdefault(template_field, {})
215 self.content_scenario_templates[template_field].setdefault(scenario_delivery, [])
216 self.content_scenario_templates[template_field][scenario_delivery].append(scenario_name)
218 self.delivery_by_scenario[SCENARIO_DEFAULT] = list(default_deliveries.keys())
220 async def register_delivery_methods(
221 self,
222 delivery_methods: list[DeliveryMethod] | None = None,
223 delivery_method_classes: list[type[DeliveryMethod]] | None = None,
224 set_as_default: bool = False,
225 ) -> None:
226 """Available directly for test fixtures supplying class or instance"""
227 if delivery_methods:
228 for delivery_method in delivery_methods:
229 self.methods[delivery_method.method] = delivery_method
230 await self.methods[delivery_method.method].initialize()
231 self.deliveries.update(self.methods[delivery_method.method].valid_deliveries)
232 if delivery_method_classes and self.hass:
233 for delivery_method_class in delivery_method_classes:
234 self.methods[delivery_method_class.method] = delivery_method_class(self.hass, self, self._deliveries)
235 await self.methods[delivery_method_class.method].initialize()
236 self.deliveries.update(self.methods[delivery_method_class.method].valid_deliveries)
238 for d, dc in self.deliveries.items():
239 if dc.get(CONF_METHOD) not in self.methods:
240 _LOGGER.warning("SUPERNOTIFY Ignoring delivery %s without known method %s", d, dc.get(CONF_METHOD))
241 elif set_as_default and d not in self.delivery_by_scenario[SCENARIO_DEFAULT]:
242 self.delivery_by_scenario[SCENARIO_DEFAULT].append(d)
244 _LOGGER.info("SUPERNOTIFY configured deliveries %s", "; ".join(self.deliveries.keys()))
246 def set_method_default(self, delivery_config: dict[str, Any], attr: str) -> None:
247 if attr not in delivery_config and CONF_METHOD in delivery_config:
248 method_default: dict[str, Any] = self.method_defaults.get(delivery_config[CONF_METHOD], {})
249 if method_default.get(attr):
250 delivery_config[attr] = method_default[attr]
251 _LOGGER.debug(
252 "SUPERNOTIFY Defaulting delivery %s to %s %s", delivery_config[CONF_NAME], attr, delivery_config[attr]
253 )
255 def delivery_method(self, delivery: str) -> DeliveryMethod:
256 method_name = self.deliveries.get(delivery, {}).get(CONF_METHOD)
257 method: DeliveryMethod | None = self.methods.get(method_name)
258 if not method:
259 raise ValueError(f"SUPERNOTIFY No method for delivery {delivery}")
260 return method
262 def setup_people(self, recipients: list[dict[str, Any]] | tuple[dict[str, Any]]) -> dict[str, dict[str, Any]]:
263 people: dict[str, dict[str, Any]] = {}
264 for r in recipients:
265 if r.get(CONF_MOBILE_DISCOVERY):
266 r[CONF_MOBILE_DEVICES].extend(self.mobile_devices_for_person(r[CONF_PERSON]))
267 if r.get(CONF_MOBILE_DEVICES):
268 _LOGGER.info("SUPERNOTIFY Auto configured %s for mobile devices %s", r[CONF_PERSON], r[CONF_MOBILE_DEVICES])
269 else:
270 _LOGGER.warning("SUPERNOTIFY Unable to find mobile devices for %s", r[CONF_PERSON])
271 if self.hass:
272 state: State | None = self.hass.states.get(r[CONF_PERSON])
273 if state is not None:
274 r[ATTR_USER_ID] = state.attributes.get(ATTR_USER_ID)
275 people[r[CONF_PERSON]] = r
276 return people
278 def people_state(self) -> list[dict[str, Any]]:
279 results = []
280 if self.hass:
281 for person, person_config in self.people.items():
282 # TODO: possibly rate limit this
283 try:
284 tracker = self.hass.states.get(person)
285 if tracker is None:
286 person_config[ATTR_STATE] = None
287 else:
288 person_config[ATTR_STATE] = tracker.state
289 except Exception as e:
290 _LOGGER.warning("Unable to determine occupied status for %s: %s", person, e)
291 results.append(person_config)
292 return results
294 def determine_occupancy(self) -> dict[str, list[dict[str, Any]]]:
295 results: dict[str, list[dict[str, Any]]] = {STATE_HOME: [], STATE_NOT_HOME: []}
296 for person_config in self.people_state():
297 if person_config.get(ATTR_STATE) in (None, STATE_HOME):
298 # default to at home if unknown tracker
299 results[STATE_HOME].append(person_config)
300 else:
301 results[STATE_NOT_HOME].append(person_config)
302 return results
304 def entity_registry(self) -> entity_registry.EntityRegistry | None:
305 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing
306 to all entities, so do it once here
307 """ # noqa: D205
308 if self._entity_registry is not None:
309 return self._entity_registry
310 if self.hass:
311 try:
312 self._entity_registry = entity_registry.async_get(self.hass)
313 except Exception as e:
314 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e)
315 return self._entity_registry
317 def device_registry(self) -> device_registry.DeviceRegistry | None:
318 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing
319 to all devices, so do it once here
320 """ # noqa: D205
321 if self._device_registry is not None:
322 return self._device_registry
323 if self.hass:
324 try:
325 self._device_registry = device_registry.async_get(self.hass)
326 except Exception as e:
327 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e)
328 return self._device_registry
330 def mobile_devices_for_person(self, person_entity_id: str, validate_targets: bool = False) -> list[dict[str, Any]]:
331 """Auto detect mobile_app targets for a person.
333 Targets not currently validated as async registration may not be complete at this stage
335 Args:
336 ----
337 person_entity_id (str): _description_
338 validate_targets (bool, optional): _description_. Defaults to False.
340 Returns:
341 -------
342 list: mobile target actions for this person
344 """
345 mobile_devices = []
346 person_state = self.hass.states.get(person_entity_id) if self.hass else None
347 if not person_state:
348 _LOGGER.warning("SUPERNOTIFY Unable to resolve %s", person_entity_id)
349 else:
350 ent_reg = self.entity_registry()
351 dev_reg = self.device_registry()
352 if not ent_reg or not dev_reg:
353 _LOGGER.warning("SUPERNOTIFY Unable to access entity or device registries for %s", person_entity_id)
354 else:
355 for d_t in person_state.attributes.get("device_trackers", ()):
356 entity = ent_reg.async_get(d_t)
357 if entity and entity.platform == "mobile_app" and entity.device_id:
358 device = dev_reg.async_get(entity.device_id)
359 if not device:
360 _LOGGER.warning("SUPERNOTIFY Unable to find device %s", entity.device_id)
361 else:
362 notify_action = f"mobile_app_{slugify(device.name)}"
363 if (
364 validate_targets
365 and self.hass
366 and self.hass.services
367 and not self.hass.services.has_service("notify", notify_action)
368 ):
369 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", notify_action)
370 else:
371 mobile_devices.append({
372 CONF_MANUFACTURER: device.manufacturer,
373 CONF_MODEL: device.model,
374 CONF_NOTIFY_ACTION: notify_action,
375 CONF_DEVICE_TRACKER: d_t,
376 CONF_DEVICE_ID: device.id,
377 CONF_DEVICE_NAME: device.name,
378 # CONF_DEVICE_LABELS: device.labels,
379 })
380 else:
381 _LOGGER.debug("SUPERNOTIFY Ignoring device tracker %s", d_t)
383 return mobile_devices