Coverage for custom_components/supernotify/notify.py: 86%
211 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-18 09:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-18 09:29 +0000
1"""Supernotify service, extending BaseNotificationService"""
3import datetime as dt
4import json
5import logging
6from dataclasses import asdict
7from traceback import format_exception
8from typing import Any
10from cachetools import TTLCache
11from homeassistant.components.notify.legacy import BaseNotificationService
12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN
13from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, ServiceCall, SupportsResponse, callback
14from homeassistant.helpers.condition import async_validate_condition_config
15from homeassistant.helpers.event import async_track_time_change
16from homeassistant.helpers.json import ExtendedJSONEncoder
17from homeassistant.helpers.reload import async_setup_reload_service
18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL
22from . import (
23 ATTR_ACTION,
24 ATTR_DATA,
25 ATTR_DUPE_POLICY_MTSLP,
26 ATTR_DUPE_POLICY_NONE,
27 CONF_ACTION_GROUPS,
28 CONF_ACTIONS,
29 CONF_ARCHIVE,
30 CONF_CAMERAS,
31 CONF_DELIVERY,
32 CONF_DUPE_CHECK,
33 CONF_DUPE_POLICY,
34 CONF_HOUSEKEEPING,
35 CONF_HOUSEKEEPING_TIME,
36 CONF_LINKS,
37 CONF_MEDIA_PATH,
38 CONF_METHODS,
39 CONF_RECIPIENTS,
40 CONF_SCENARIOS,
41 CONF_SIZE,
42 CONF_TEMPLATE_PATH,
43 CONF_TTL,
44 DOMAIN,
45 PLATFORMS,
46 PRIORITY_MEDIUM,
47 PRIORITY_VALUES,
48 ConditionVariables,
49)
50from . import SUPERNOTIFY_SCHEMA as PLATFORM_SCHEMA
51from .configuration import Context
52from .methods.alexa import AlexaDeliveryMethod
53from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod
54from .methods.chime import ChimeDeliveryMethod
55from .methods.email import EmailDeliveryMethod
56from .methods.generic import GenericDeliveryMethod
57from .methods.media_player_image import MediaPlayerImageDeliveryMethod
58from .methods.mobile_push import MobilePushDeliveryMethod
59from .methods.persistent import PersistentDeliveryMethod
60from .methods.sms import SMSDeliveryMethod
61from .notification import Notification
63_LOGGER = logging.getLogger(__name__)
65SNOOZE_TIME = 60 * 60 # TODO: move to configuration
67METHODS: list[type] = [
68 EmailDeliveryMethod,
69 SMSDeliveryMethod,
70 AlexaDeliveryMethod,
71 AlexaMediaPlayerDeliveryMethod,
72 MobilePushDeliveryMethod,
73 MediaPlayerImageDeliveryMethod,
74 ChimeDeliveryMethod,
75 PersistentDeliveryMethod,
76 GenericDeliveryMethod,
77] # No auto-discovery of method plugins so manual class registration required here
80async def async_get_service(
81 hass: HomeAssistant,
82 config: ConfigType,
83 discovery_info: DiscoveryInfoType | None = None,
84) -> "SuperNotificationAction":
85 """Notify specific component setup - see async_setup_legacy in BaseNotificationService"""
86 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection
87 _ = discovery_info
88 for delivery in config.get(CONF_DELIVERY, {}).values():
89 if delivery and CONF_CONDITION in delivery:
90 try:
91 await async_validate_condition_config(hass, delivery[CONF_CONDITION])
92 except Exception as e:
93 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e)
94 raise
96 hass.states.async_set(
97 f"{DOMAIN}.configured",
98 "True",
99 {
100 CONF_DELIVERY: config.get(CONF_DELIVERY, {}),
101 CONF_LINKS: config.get(CONF_LINKS, ()),
102 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH, None),
103 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH, None),
104 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}),
105 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()),
106 CONF_ACTIONS: config.get(CONF_ACTIONS, {}),
107 CONF_HOUSEKEEPING: config.get(CONF_HOUSEKEEPING, {}),
108 CONF_ACTION_GROUPS: config.get(CONF_ACTION_GROUPS, {}),
109 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()),
110 CONF_METHODS: config.get(CONF_METHODS, {}),
111 CONF_CAMERAS: config.get(CONF_CAMERAS, {}),
112 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}),
113 },
114 )
115 hass.states.async_set(f"{DOMAIN}.failures", "0")
116 hass.states.async_set(f"{DOMAIN}.sent", "0")
118 await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
119 service = SuperNotificationAction(
120 hass,
121 deliveries=config[CONF_DELIVERY],
122 template_path=config[CONF_TEMPLATE_PATH],
123 media_path=config[CONF_MEDIA_PATH],
124 archive=config[CONF_ARCHIVE],
125 housekeeping=config[CONF_HOUSEKEEPING],
126 recipients=config[CONF_RECIPIENTS],
127 mobile_actions=config[CONF_ACTION_GROUPS],
128 scenarios=config[CONF_SCENARIOS],
129 links=config[CONF_LINKS],
130 method_defaults=config[CONF_METHODS],
131 cameras=config[CONF_CAMERAS],
132 dupe_check=config[CONF_DUPE_CHECK],
133 )
134 await service.initialize()
136 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict[str, Any]:
137 return service.enquire_deliveries_by_scenario()
139 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict[str, Any]:
140 return service.last_notification.contents() if service.last_notification else {}
142 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict[str, Any]:
143 trace = call.data.get("trace", False)
144 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()}
145 if trace:
146 result["trace"] = await service.trace_active_scenarios()
147 return result
149 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict[str, Any]:
150 return {"scenarios": service.enquire_scenarios()}
152 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict[str, Any]:
153 return {"scenarios": await service.enquire_occupancy()}
155 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict[str, Any]:
156 return {"snoozes": service.enquire_snoozes()}
158 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict[str, Any]:
159 return {"cleared": service.clear_snoozes()}
161 def supplemental_action_enquire_people(_call: ServiceCall) -> dict[str, Any]:
162 return {"people": service.enquire_people()}
164 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]:
165 days = call.data.get("days")
166 if not service.context.archive.enabled:
167 return {"error": "No archive configured"}
168 purged = await service.context.archive.cleanup(days=days, force=True)
169 arch_size = await service.context.archive.size()
170 return {
171 "purged": purged,
172 "remaining": arch_size,
173 "interval": ARCHIVE_PURGE_MIN_INTERVAL,
174 "days": service.context.archive.archive_days if days is None else days,
175 }
177 hass.services.async_register(
178 DOMAIN,
179 "enquire_deliveries_by_scenario",
180 supplemental_action_enquire_deliveries_by_scenario,
181 supports_response=SupportsResponse.ONLY,
182 )
183 hass.services.async_register(
184 DOMAIN,
185 "enquire_last_notification",
186 supplemental_action_enquire_last_notification,
187 supports_response=SupportsResponse.ONLY,
188 )
189 hass.services.async_register(
190 DOMAIN,
191 "enquire_active_scenarios",
192 supplemental_action_enquire_active_scenarios,
193 supports_response=SupportsResponse.ONLY,
194 )
195 hass.services.async_register(
196 DOMAIN,
197 "enquire_scenarios",
198 supplemental_action_enquire_scenarios,
199 supports_response=SupportsResponse.ONLY,
200 )
201 hass.services.async_register(
202 DOMAIN,
203 "enquire_occupancy",
204 supplemental_action_enquire_occupancy,
205 supports_response=SupportsResponse.ONLY,
206 )
207 hass.services.async_register(
208 DOMAIN,
209 "enquire_people",
210 supplemental_action_enquire_people,
211 supports_response=SupportsResponse.ONLY,
212 )
213 hass.services.async_register(
214 DOMAIN,
215 "enquire_snoozes",
216 supplemental_action_enquire_snoozes,
217 supports_response=SupportsResponse.ONLY,
218 )
219 hass.services.async_register(
220 DOMAIN,
221 "clear_snoozes",
222 supplemental_action_clear_snoozes,
223 supports_response=SupportsResponse.ONLY,
224 )
225 hass.services.async_register(
226 DOMAIN,
227 "purge_archive",
228 supplemental_action_purge_archive,
229 supports_response=SupportsResponse.ONLY,
230 )
232 return service
235class SuperNotificationAction(BaseNotificationService):
236 """Implement SuperNotification action."""
238 def __init__(
239 self,
240 hass: HomeAssistant,
241 deliveries: dict[str, dict[str, Any]] | None = None,
242 template_path: str | None = None,
243 media_path: str | None = None,
244 archive: dict[str, Any] | None = None,
245 housekeeping: dict[str, Any] | None = None,
246 recipients: list[dict[str, Any]] | None = None,
247 mobile_actions: dict[str, Any] | None = None,
248 scenarios: dict[str, dict[str, Any]] | None = None,
249 links: list[str] | None = None,
250 method_defaults: dict[str, Any] | None = None,
251 cameras: list[dict[str, Any]] | None = None,
252 dupe_check: dict[str, Any] | None = None,
253 ) -> None:
254 """Initialize the service."""
255 self.hass: HomeAssistant = hass
256 self.last_notification: Notification | None = None
257 self.failures: int = 0
258 self.housekeeping: dict[str, Any] = housekeeping or {}
259 self.sent: int = 0
260 self.context = Context(
261 hass,
262 deliveries,
263 links or [],
264 recipients or [],
265 mobile_actions,
266 template_path,
267 media_path,
268 archive,
269 scenarios,
270 method_defaults or {},
271 cameras,
272 )
273 self.unsubscribes: list[CALLBACK_TYPE] = []
274 self.dupe_check_config: dict[str, Any] = dupe_check or {}
275 self.last_purge: dt.datetime | None = None
276 self.notification_cache: TTLCache[tuple[int, str], str] = TTLCache(
277 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120)
278 )
280 async def initialize(self) -> None:
281 await self.context.initialize()
282 await self.context.register_delivery_methods(delivery_method_classes=METHODS)
284 self.expose_entities()
285 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
286 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME)
287 if housekeeping_schedule:
288 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule)
289 self.unsubscribes.append(
290 async_track_time_change(
291 self.hass,
292 self.async_nightly_tasks,
293 hour=housekeeping_schedule.hour,
294 minute=housekeeping_schedule.minute,
295 second=housekeeping_schedule.second,
296 )
297 )
299 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown))
301 async def async_shutdown(self, event: Event) -> None:
302 _LOGGER.info("SUPERNOTIFY shutting down, %s", event)
303 self.shutdown()
305 async def async_unregister_services(self) -> None:
306 _LOGGER.info("SUPERNOTIFY unregistering")
307 self.shutdown()
308 return await super().async_unregister_services()
310 def shutdown(self) -> None:
311 for unsub in self.unsubscribes:
312 try:
313 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub)
314 unsub()
315 except Exception as e:
316 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e)
317 _LOGGER.info("SUPERNOTIFY shut down")
319 def expose_entities(self) -> None:
320 for scenario in self.context.scenarios.values():
321 self.hass.states.async_set(
322 f"{DOMAIN}.scenario_{scenario.name}", STATE_UNKNOWN, scenario.attributes(include_condition=False)
323 )
324 for method in self.context.methods.values():
325 self.hass.states.async_set(
326 f"{DOMAIN}.method_{method.method}",
327 STATE_ON if len(method.valid_deliveries) > 0 else STATE_OFF,
328 method.attributes(),
329 )
330 for delivery_name, delivery in self.context._deliveries.items():
331 self.hass.states.async_set(
332 f"{DOMAIN}.delivery_{delivery_name}",
333 STATE_ON if str(delivery_name in self.context.deliveries) else STATE_OFF,
334 delivery,
335 )
337 def dupe_check(self, notification: Notification) -> bool:
338 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP)
339 if policy == ATTR_DUPE_POLICY_NONE:
340 return False
341 notification_hash = notification.hash()
342 if notification.priority in PRIORITY_VALUES:
343 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :]
344 else:
345 same_or_higher_priority = [notification.priority]
346 dupe = False
347 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority):
348 _LOGGER.debug("SUPERNOTIFY Detected dupe notification")
349 dupe = True
350 self.notification_cache[notification_hash, notification.priority] = notification.id
351 return dupe
353 async def async_send_message(
354 self, message: str = "", title: str | None = None, target: list[str] | str | None = None, **kwargs: Any
355 ) -> None:
356 """Send a message via chosen method."""
357 data = kwargs.get(ATTR_DATA, {})
358 notification = None
359 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data)
361 try:
362 notification = Notification(self.context, message, title, target, data)
363 await notification.initialize()
364 if self.dupe_check(notification):
365 notification.suppress()
366 else:
367 if await notification.deliver():
368 self.sent += 1
369 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent))
370 elif notification.errored:
371 _LOGGER.warning("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored)
372 else:
373 _LOGGER.info("SUPERNOTIFY No delivery selected for %s", notification.id)
375 except Exception as err:
376 # fault barrier of last resort, integration failures should be caught within envelope delivery
377 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True)
378 self.failures += 1
379 if notification is not None:
380 notification.delivery_error = format_exception(err)
381 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures))
383 if notification is not None:
384 self.last_notification = notification
385 self.context.archive.archive(notification)
386 if self.context.archive_topic:
387 await self.context.archive_topic.publish(notification)
389 _LOGGER.debug(
390 "SUPERNOTIFY %s deliveries, %s errors, %s skipped",
391 notification.delivered,
392 notification.errored,
393 notification.skipped,
394 )
396 def enquire_deliveries_by_scenario(self) -> dict[str, list[str]]:
397 return self.context.delivery_by_scenario
399 async def enquire_occupancy(self) -> dict[str, list[dict[str, Any]]]:
400 return self.context.determine_occupancy()
402 async def enquire_active_scenarios(self) -> list[str]:
403 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy()
404 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
405 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)]
407 async def trace_active_scenarios(self) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
408 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy()
409 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
411 def safe_json(v: Any) -> Any:
412 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder))
414 enabled = []
415 disabled = []
416 dcvars = asdict(cvars)
417 for s in self.context.scenarios.values():
418 if await s.trace(cvars):
419 enabled.append(safe_json(s.attributes(include_trace=True)))
420 else:
421 disabled.append(safe_json(s.attributes(include_trace=True)))
422 return enabled, disabled, dcvars
424 def enquire_scenarios(self) -> dict[str, dict[str, Any]]:
425 return {s.name: s.attributes(include_condition=False) for s in self.context.scenarios.values()}
427 def enquire_snoozes(self) -> list[dict[str, Any]]:
428 return self.context.snoozer.export()
430 def clear_snoozes(self) -> int:
431 return self.context.snoozer.clear()
433 def enquire_people(self) -> list[dict[str, Any]]:
434 return list(self.context.people.values())
436 @callback
437 def on_mobile_action(self, event: Event) -> None:
438 """Listen for mobile actions relevant to snooze and silence notifications
440 Example Action:
441 event_type: mobile_app_notification_action
442 data:
443 foo: a
444 origin: REMOTE
445 time_fired: "2024-04-20T13:14:09.360708+00:00"
446 context:
447 id: 01HVXT93JGWEDW0KE57Z0X6Z1K
448 parent_id: null
449 user_id: e9dbae1a5abf44dbbad52ff85501bb17
450 """
451 event_name = event.data.get(ATTR_ACTION)
452 if event_name is None or not event_name.startswith("SUPERNOTIFY_"):
453 return # event not intended for here
454 self.context.snoozer.handle_command_event(event, self.context.people)
456 @callback
457 async def async_nightly_tasks(self, now: dt.datetime) -> None:
458 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now)
459 await self.context.archive.cleanup()
460 self.context.snoozer.purge_snoozes()
461 _LOGGER.info("SUPERNOTIFY Housekeeping completed")