Coverage for custom_components/supernotify/notify.py: 86%
214 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-26 08:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-26 08:54 +0000
1"""Supernotify service, extending BaseNotificationService"""
3import datetime as dt
4import json
5import logging
6from dataclasses import asdict
7from traceback import format_exception
8from typing import Any
10from cachetools import TTLCache
11from homeassistant.components.notify.legacy import BaseNotificationService
12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN
13from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, ServiceCall, SupportsResponse, callback
14from homeassistant.helpers.condition import async_validate_condition_config
15from homeassistant.helpers.event import async_track_time_change
16from homeassistant.helpers.json import ExtendedJSONEncoder
17from homeassistant.helpers.reload import async_setup_reload_service
18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL
21from custom_components.supernotify.delivery_method import DeliveryMethod
23from . import (
24 ATTR_ACTION,
25 ATTR_DATA,
26 ATTR_DUPE_POLICY_MTSLP,
27 ATTR_DUPE_POLICY_NONE,
28 CONF_ACTION_GROUPS,
29 CONF_ACTIONS,
30 CONF_ARCHIVE,
31 CONF_CAMERAS,
32 CONF_DELIVERY,
33 CONF_DUPE_CHECK,
34 CONF_DUPE_POLICY,
35 CONF_HOUSEKEEPING,
36 CONF_HOUSEKEEPING_TIME,
37 CONF_LINKS,
38 CONF_MEDIA_PATH,
39 CONF_METHODS,
40 CONF_RECIPIENTS,
41 CONF_SCENARIOS,
42 CONF_SIZE,
43 CONF_TEMPLATE_PATH,
44 CONF_TTL,
45 DOMAIN,
46 PLATFORMS,
47 PRIORITY_MEDIUM,
48 PRIORITY_VALUES,
49 ConditionVariables,
50)
51from . import SUPERNOTIFY_SCHEMA as PLATFORM_SCHEMA
52from .configuration import Context
53from .methods.alexa_devices import AlexaDevicesDeliveryMethod
54from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod
55from .methods.chime import ChimeDeliveryMethod
56from .methods.email import EmailDeliveryMethod
57from .methods.generic import GenericDeliveryMethod
58from .methods.media_player_image import MediaPlayerImageDeliveryMethod
59from .methods.mobile_push import MobilePushDeliveryMethod
60from .methods.persistent import PersistentDeliveryMethod
61from .methods.sms import SMSDeliveryMethod
62from .notification import Notification
64_LOGGER = logging.getLogger(__name__)
66SNOOZE_TIME = 60 * 60 # TODO: move to configuration
68METHODS: list[type[DeliveryMethod]] = [
69 EmailDeliveryMethod,
70 SMSDeliveryMethod,
71 AlexaDevicesDeliveryMethod,
72 AlexaMediaPlayerDeliveryMethod,
73 MobilePushDeliveryMethod,
74 MediaPlayerImageDeliveryMethod,
75 ChimeDeliveryMethod,
76 PersistentDeliveryMethod,
77 GenericDeliveryMethod,
78] # No auto-discovery of method plugins so manual class registration required here
81async def async_get_service(
82 hass: HomeAssistant,
83 config: ConfigType,
84 discovery_info: DiscoveryInfoType | None = None,
85) -> "SuperNotificationAction":
86 """Notify specific component setup - see async_setup_legacy in BaseNotificationService"""
87 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection
88 _ = discovery_info
89 for delivery in config.get(CONF_DELIVERY, {}).values():
90 if delivery and CONF_CONDITION in delivery:
91 try:
92 await async_validate_condition_config(hass, delivery[CONF_CONDITION])
93 except Exception as e:
94 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e)
95 raise
97 hass.states.async_set(
98 f"{DOMAIN}.configured",
99 "True",
100 {
101 CONF_DELIVERY: config.get(CONF_DELIVERY, {}),
102 CONF_LINKS: config.get(CONF_LINKS, ()),
103 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH, None),
104 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH, None),
105 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}),
106 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()),
107 CONF_ACTIONS: config.get(CONF_ACTIONS, {}),
108 CONF_HOUSEKEEPING: config.get(CONF_HOUSEKEEPING, {}),
109 CONF_ACTION_GROUPS: config.get(CONF_ACTION_GROUPS, {}),
110 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()),
111 CONF_METHODS: config.get(CONF_METHODS, {}),
112 CONF_CAMERAS: config.get(CONF_CAMERAS, {}),
113 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}),
114 },
115 )
116 hass.states.async_set(f"{DOMAIN}.failures", "0")
117 hass.states.async_set(f"{DOMAIN}.sent", "0")
119 await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
120 service = SuperNotificationAction(
121 hass,
122 deliveries=config[CONF_DELIVERY],
123 template_path=config[CONF_TEMPLATE_PATH],
124 media_path=config[CONF_MEDIA_PATH],
125 archive=config[CONF_ARCHIVE],
126 housekeeping=config[CONF_HOUSEKEEPING],
127 recipients=config[CONF_RECIPIENTS],
128 mobile_actions=config[CONF_ACTION_GROUPS],
129 scenarios=config[CONF_SCENARIOS],
130 links=config[CONF_LINKS],
131 method_configs=config[CONF_METHODS],
132 cameras=config[CONF_CAMERAS],
133 dupe_check=config[CONF_DUPE_CHECK],
134 )
135 await service.initialize()
137 def supplemental_action_refresh_entities(_call: ServiceCall) -> None:
138 return service.expose_entities()
140 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict[str, Any]:
141 return service.enquire_deliveries_by_scenario()
143 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict[str, Any]:
144 return service.last_notification.contents() if service.last_notification else {}
146 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict[str, Any]:
147 trace = call.data.get("trace", False)
148 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()}
149 if trace:
150 result["trace"] = await service.trace_active_scenarios()
151 return result
153 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict[str, Any]:
154 return {"scenarios": service.enquire_scenarios()}
156 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict[str, Any]:
157 return {"scenarios": await service.enquire_occupancy()}
159 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict[str, Any]:
160 return {"snoozes": service.enquire_snoozes()}
162 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict[str, Any]:
163 return {"cleared": service.clear_snoozes()}
165 def supplemental_action_enquire_people(_call: ServiceCall) -> dict[str, Any]:
166 return {"people": service.enquire_people()}
168 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]:
169 days = call.data.get("days")
170 if not service.context.archive.enabled:
171 return {"error": "No archive configured"}
172 purged = await service.context.archive.cleanup(days=days, force=True)
173 arch_size = await service.context.archive.size()
174 return {
175 "purged": purged,
176 "remaining": arch_size,
177 "interval": ARCHIVE_PURGE_MIN_INTERVAL,
178 "days": service.context.archive.archive_days if days is None else days,
179 }
181 hass.services.async_register(
182 DOMAIN,
183 "enquire_deliveries_by_scenario",
184 supplemental_action_enquire_deliveries_by_scenario,
185 supports_response=SupportsResponse.ONLY,
186 )
187 hass.services.async_register(
188 DOMAIN,
189 "enquire_last_notification",
190 supplemental_action_enquire_last_notification,
191 supports_response=SupportsResponse.ONLY,
192 )
193 hass.services.async_register(
194 DOMAIN,
195 "enquire_active_scenarios",
196 supplemental_action_enquire_active_scenarios,
197 supports_response=SupportsResponse.ONLY,
198 )
199 hass.services.async_register(
200 DOMAIN,
201 "enquire_scenarios",
202 supplemental_action_enquire_scenarios,
203 supports_response=SupportsResponse.ONLY,
204 )
205 hass.services.async_register(
206 DOMAIN,
207 "enquire_occupancy",
208 supplemental_action_enquire_occupancy,
209 supports_response=SupportsResponse.ONLY,
210 )
211 hass.services.async_register(
212 DOMAIN,
213 "enquire_people",
214 supplemental_action_enquire_people,
215 supports_response=SupportsResponse.ONLY,
216 )
217 hass.services.async_register(
218 DOMAIN,
219 "enquire_snoozes",
220 supplemental_action_enquire_snoozes,
221 supports_response=SupportsResponse.ONLY,
222 )
223 hass.services.async_register(
224 DOMAIN,
225 "clear_snoozes",
226 supplemental_action_clear_snoozes,
227 supports_response=SupportsResponse.ONLY,
228 )
229 hass.services.async_register(
230 DOMAIN,
231 "purge_archive",
232 supplemental_action_purge_archive,
233 supports_response=SupportsResponse.ONLY,
234 )
235 hass.services.async_register(
236 DOMAIN,
237 "refresh_entities",
238 supplemental_action_refresh_entities,
239 supports_response=SupportsResponse.NONE,
240 )
242 return service
245class SuperNotificationAction(BaseNotificationService):
246 """Implement SuperNotification action."""
248 def __init__(
249 self,
250 hass: HomeAssistant,
251 deliveries: dict[str, dict[str, Any]] | None = None,
252 template_path: str | None = None,
253 media_path: str | None = None,
254 archive: dict[str, Any] | None = None,
255 housekeeping: dict[str, Any] | None = None,
256 recipients: list[dict[str, Any]] | None = None,
257 mobile_actions: dict[str, Any] | None = None,
258 scenarios: dict[str, dict[str, Any]] | None = None,
259 links: list[str] | None = None,
260 method_configs: dict[str, Any] | None = None,
261 cameras: list[dict[str, Any]] | None = None,
262 dupe_check: dict[str, Any] | None = None,
263 ) -> None:
264 """Initialize the service."""
265 self.hass: HomeAssistant = hass
266 self.last_notification: Notification | None = None
267 self.failures: int = 0
268 self.housekeeping: dict[str, Any] = housekeeping or {}
269 self.sent: int = 0
270 self.context = Context(
271 hass,
272 deliveries,
273 links or [],
274 recipients or [],
275 mobile_actions,
276 template_path,
277 media_path,
278 archive,
279 scenarios,
280 method_configs or {},
281 cameras,
282 METHODS,
283 )
284 self.unsubscribes: list[CALLBACK_TYPE] = []
285 self.dupe_check_config: dict[str, Any] = dupe_check or {}
286 self.last_purge: dt.datetime | None = None
287 self.notification_cache: TTLCache[tuple[int, str], str] = TTLCache(
288 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120)
289 )
291 async def initialize(self) -> None:
292 await self.context.initialize()
294 self.expose_entities()
295 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
296 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME)
297 if housekeeping_schedule:
298 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule)
299 self.unsubscribes.append(
300 async_track_time_change(
301 self.hass,
302 self.async_nightly_tasks,
303 hour=housekeeping_schedule.hour,
304 minute=housekeeping_schedule.minute,
305 second=housekeeping_schedule.second,
306 )
307 )
309 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown))
311 async def async_shutdown(self, event: Event) -> None:
312 _LOGGER.info("SUPERNOTIFY shutting down, %s", event)
313 self.shutdown()
315 async def async_unregister_services(self) -> None:
316 _LOGGER.info("SUPERNOTIFY unregistering")
317 self.shutdown()
318 return await super().async_unregister_services()
320 def shutdown(self) -> None:
321 for unsub in self.unsubscribes:
322 try:
323 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub)
324 unsub()
325 except Exception as e:
326 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e)
327 _LOGGER.info("SUPERNOTIFY shut down")
329 def expose_entities(self) -> None:
330 for scenario in self.context.scenarios.values():
331 self.hass.states.async_set(
332 f"{DOMAIN}.scenario_{scenario.name}", STATE_UNKNOWN, scenario.attributes(include_condition=False)
333 )
334 for method in self.context.methods.values():
335 self.hass.states.async_set(
336 f"{DOMAIN}.method_{method.method}",
337 STATE_ON if len(method.valid_deliveries) > 0 else STATE_OFF,
338 method.attributes(),
339 )
340 for delivery_name, delivery in self.context._deliveries.items():
341 self.hass.states.async_set(
342 f"{DOMAIN}.delivery_{delivery_name}",
343 STATE_ON if str(delivery_name in self.context.deliveries) else STATE_OFF,
344 delivery,
345 )
347 def dupe_check(self, notification: Notification) -> bool:
348 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP)
349 if policy == ATTR_DUPE_POLICY_NONE:
350 return False
351 notification_hash = notification.hash()
352 if notification.priority in PRIORITY_VALUES:
353 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :]
354 else:
355 same_or_higher_priority = [notification.priority]
356 dupe = False
357 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority):
358 _LOGGER.debug("SUPERNOTIFY Detected dupe notification")
359 dupe = True
360 self.notification_cache[notification_hash, notification.priority] = notification.id
361 return dupe
363 async def async_send_message(
364 self, message: str = "", title: str | None = None, target: list[str] | str | None = None, **kwargs: Any
365 ) -> None:
366 """Send a message via chosen method."""
367 data = kwargs.get(ATTR_DATA, {})
368 notification = None
369 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data)
371 try:
372 notification = Notification(self.context, message, title, target, data)
373 await notification.initialize()
374 if self.dupe_check(notification):
375 notification.suppress()
376 else:
377 if await notification.deliver():
378 self.sent += 1
379 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent))
380 elif notification.errored:
381 _LOGGER.error("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored)
382 else:
383 _LOGGER.warning("SUPERNOTIFY No delivery selected for %s", notification.id)
385 except Exception as err:
386 # fault barrier of last resort, integration failures should be caught within envelope delivery
387 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True)
388 self.failures += 1
389 if notification is not None:
390 notification.delivery_error = format_exception(err)
391 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures))
393 if notification is not None:
394 self.last_notification = notification
395 self.context.archive.archive(notification)
396 if self.context.archive_topic:
397 await self.context.archive_topic.publish(notification)
399 _LOGGER.debug(
400 "SUPERNOTIFY %s deliveries, %s errors, %s skipped",
401 notification.delivered,
402 notification.errored,
403 notification.skipped,
404 )
406 def enquire_deliveries_by_scenario(self) -> dict[str, list[str]]:
407 return self.context.delivery_by_scenario
409 async def enquire_occupancy(self) -> dict[str, list[dict[str, Any]]]:
410 return self.context.determine_occupancy()
412 async def enquire_active_scenarios(self) -> list[str]:
413 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy()
414 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
415 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)]
417 async def trace_active_scenarios(self) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
418 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy()
419 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
421 def safe_json(v: Any) -> Any:
422 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder))
424 enabled = []
425 disabled = []
426 dcvars = asdict(cvars)
427 for s in self.context.scenarios.values():
428 if await s.trace(cvars):
429 enabled.append(safe_json(s.attributes(include_trace=True)))
430 else:
431 disabled.append(safe_json(s.attributes(include_trace=True)))
432 return enabled, disabled, dcvars
434 def enquire_scenarios(self) -> dict[str, dict[str, Any]]:
435 return {s.name: s.attributes(include_condition=False) for s in self.context.scenarios.values()}
437 def enquire_snoozes(self) -> list[dict[str, Any]]:
438 return self.context.snoozer.export()
440 def clear_snoozes(self) -> int:
441 return self.context.snoozer.clear()
443 def enquire_people(self) -> list[dict[str, Any]]:
444 return list(self.context.people.values())
446 @callback
447 def on_mobile_action(self, event: Event) -> None:
448 """Listen for mobile actions relevant to snooze and silence notifications
450 Example Action:
451 event_type: mobile_app_notification_action
452 data:
453 foo: a
454 origin: REMOTE
455 time_fired: "2024-04-20T13:14:09.360708+00:00"
456 context:
457 id: 01HVXT93JGWEDW0KE57Z0X6Z1K
458 parent_id: null
459 user_id: e9dbae1a5abf44dbbad52ff85501bb17
460 """
461 event_name = event.data.get(ATTR_ACTION)
462 if event_name is None or not event_name.startswith("SUPERNOTIFY_"):
463 return # event not intended for here
464 self.context.snoozer.handle_command_event(event, self.context.people)
466 @callback
467 async def async_nightly_tasks(self, now: dt.datetime) -> None:
468 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now)
469 await self.context.archive.cleanup()
470 self.context.snoozer.purge_snoozes()
471 _LOGGER.info("SUPERNOTIFY Housekeeping completed")