Coverage for custom_components/supernotify/notify.py: 87%
209 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-07-19 20:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-07-19 20:50 +0000
1"""Supernotify service, extending BaseNotificationService"""
3import datetime as dt
4import json
5import logging
6from dataclasses import asdict
7from traceback import format_exception
8from typing import Any
10from cachetools import TTLCache
11from homeassistant.components.notify.legacy import BaseNotificationService
12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN
13from homeassistant.core import Event, HomeAssistant, ServiceCall, SupportsResponse, callback
14from homeassistant.helpers.condition import async_validate_condition_config
15from homeassistant.helpers.event import async_track_time_change
16from homeassistant.helpers.json import ExtendedJSONEncoder
17from homeassistant.helpers.reload import async_setup_reload_service
18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL
21from custom_components.supernotify.scenario import Scenario
23from . import (
24 ATTR_ACTION,
25 ATTR_DATA,
26 ATTR_DUPE_POLICY_MTSLP,
27 ATTR_DUPE_POLICY_NONE,
28 CONF_ACTION_GROUPS,
29 CONF_ACTIONS,
30 CONF_ARCHIVE,
31 CONF_CAMERAS,
32 CONF_DELIVERY,
33 CONF_DUPE_CHECK,
34 CONF_DUPE_POLICY,
35 CONF_HOUSEKEEPING,
36 CONF_HOUSEKEEPING_TIME,
37 CONF_LINKS,
38 CONF_MEDIA_PATH,
39 CONF_METHODS,
40 CONF_RECIPIENTS,
41 CONF_SCENARIOS,
42 CONF_SIZE,
43 CONF_TEMPLATE_PATH,
44 CONF_TTL,
45 DOMAIN,
46 PLATFORM_SCHEMA,
47 PLATFORMS,
48 PRIORITY_MEDIUM,
49 PRIORITY_VALUES,
50 ConditionVariables,
51)
52from .configuration import Context
53from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod
54from .methods.chime import ChimeDeliveryMethod
55from .methods.email import EmailDeliveryMethod
56from .methods.generic import GenericDeliveryMethod
57from .methods.media_player_image import MediaPlayerImageDeliveryMethod
58from .methods.mobile_push import MobilePushDeliveryMethod
59from .methods.persistent import PersistentDeliveryMethod
60from .methods.sms import SMSDeliveryMethod
61from .notification import Notification
63_LOGGER = logging.getLogger(__name__)
65SNOOZE_TIME = 60 * 60 # TODO: move to configuration
67METHODS: list[type] = [
68 EmailDeliveryMethod,
69 SMSDeliveryMethod,
70 AlexaMediaPlayerDeliveryMethod,
71 MobilePushDeliveryMethod,
72 MediaPlayerImageDeliveryMethod,
73 ChimeDeliveryMethod,
74 PersistentDeliveryMethod,
75 GenericDeliveryMethod,
76]
79async def async_get_service(
80 hass: HomeAssistant,
81 config: ConfigType,
82 discovery_info: DiscoveryInfoType | None = None,
83) -> "SuperNotificationAction":
84 """Notify specific component setup - see async_setup_legacy in BaseNotificationService"""
85 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection
86 _ = discovery_info
87 for delivery in config.get(CONF_DELIVERY, {}).values():
88 if delivery and CONF_CONDITION in delivery:
89 try:
90 await async_validate_condition_config(hass, delivery[CONF_CONDITION])
91 except Exception as e:
92 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e)
93 raise
95 hass.states.async_set(
96 f"{DOMAIN}.configured",
97 "True",
98 {
99 CONF_DELIVERY: config.get(CONF_DELIVERY, {}),
100 CONF_LINKS: config.get(CONF_LINKS, ()),
101 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH),
102 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH),
103 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}),
104 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()),
105 CONF_ACTIONS: config.get(CONF_ACTIONS, {}),
106 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()),
107 CONF_METHODS: config.get(CONF_METHODS, {}),
108 CONF_CAMERAS: config.get(CONF_CAMERAS, {}),
109 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}),
110 },
111 )
112 hass.states.async_set(f"{DOMAIN}.failures", "0")
113 hass.states.async_set(f"{DOMAIN}.sent", "0")
115 await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
116 service = SuperNotificationAction(
117 hass,
118 deliveries=config[CONF_DELIVERY],
119 template_path=config[CONF_TEMPLATE_PATH],
120 media_path=config[CONF_MEDIA_PATH],
121 archive=config[CONF_ARCHIVE],
122 housekeeping=config[CONF_HOUSEKEEPING],
123 recipients=config[CONF_RECIPIENTS],
124 mobile_actions=config[CONF_ACTION_GROUPS],
125 scenarios=config[CONF_SCENARIOS],
126 links=config[CONF_LINKS],
127 method_defaults=config[CONF_METHODS],
128 cameras=config[CONF_CAMERAS],
129 dupe_check=config[CONF_DUPE_CHECK],
130 )
131 await service.initialize()
133 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict:
134 return service.enquire_deliveries_by_scenario()
136 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict:
137 return service.last_notification.contents() if service.last_notification else {}
139 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict:
140 trace = call.data.get("trace", False)
141 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()}
142 if trace:
143 result["trace"] = await service.trace_active_scenarios()
144 return result
146 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict:
147 return {"scenarios": service.enquire_scenarios()}
149 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict:
150 return {"scenarios": await service.enquire_occupancy()}
152 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict:
153 return {"snoozes": service.enquire_snoozes()}
155 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict:
156 return {"cleared": service.clear_snoozes()}
158 def supplemental_action_enquire_people(_call: ServiceCall) -> dict:
159 return {"people": service.enquire_people()}
161 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]:
162 days = call.data.get("days")
163 if service.context.archive is None:
164 return {"error": "No archive configured"}
165 purged = await service.context.archive.cleanup(days=days, force=True)
166 arch_size = await service.context.archive.size()
167 return {
168 "purged": purged,
169 "remaining": arch_size,
170 "interval": ARCHIVE_PURGE_MIN_INTERVAL,
171 "days": service.context.archive.archive_days if days is None else days,
172 }
174 hass.services.async_register(
175 DOMAIN,
176 "enquire_deliveries_by_scenario",
177 supplemental_action_enquire_deliveries_by_scenario,
178 supports_response=SupportsResponse.ONLY,
179 )
180 hass.services.async_register(
181 DOMAIN,
182 "enquire_last_notification",
183 supplemental_action_enquire_last_notification,
184 supports_response=SupportsResponse.ONLY,
185 )
186 hass.services.async_register(
187 DOMAIN,
188 "enquire_active_scenarios",
189 supplemental_action_enquire_active_scenarios,
190 supports_response=SupportsResponse.ONLY,
191 )
192 hass.services.async_register(
193 DOMAIN,
194 "enquire_scenarios",
195 supplemental_action_enquire_scenarios,
196 supports_response=SupportsResponse.ONLY,
197 )
198 hass.services.async_register(
199 DOMAIN,
200 "enquire_occupancy",
201 supplemental_action_enquire_occupancy,
202 supports_response=SupportsResponse.ONLY,
203 )
204 hass.services.async_register(
205 DOMAIN,
206 "enquire_people",
207 supplemental_action_enquire_people,
208 supports_response=SupportsResponse.ONLY,
209 )
210 hass.services.async_register(
211 DOMAIN,
212 "enquire_snoozes",
213 supplemental_action_enquire_snoozes,
214 supports_response=SupportsResponse.ONLY,
215 )
216 hass.services.async_register(
217 DOMAIN,
218 "clear_snoozes",
219 supplemental_action_clear_snoozes,
220 supports_response=SupportsResponse.ONLY,
221 )
222 hass.services.async_register(
223 DOMAIN,
224 "purge_archive",
225 supplemental_action_purge_archive,
226 supports_response=SupportsResponse.ONLY,
227 )
229 return service
232class SuperNotificationAction(BaseNotificationService):
233 """Implement SuperNotification action."""
235 def __init__(
236 self,
237 hass: HomeAssistant,
238 deliveries: dict[str, dict] | None = None,
239 template_path: str | None = None,
240 media_path: str | None = None,
241 archive: dict | None = None,
242 housekeeping: dict | None = None,
243 recipients: list | None = None,
244 mobile_actions: dict | None = None,
245 scenarios: dict[str, dict] | None = None,
246 links: list | None = None,
247 method_defaults: dict | None = None,
248 cameras: list[dict] | None = None,
249 dupe_check: dict | None = None,
250 ) -> None:
251 """Initialize the service."""
252 self.hass: HomeAssistant = hass
253 self.last_notification: Notification | None = None
254 self.failures: int = 0
255 self.housekeeping: dict = housekeeping or {}
256 self.sent: int = 0
257 self.context = Context(
258 hass,
259 deliveries,
260 links or [],
261 recipients or [],
262 mobile_actions,
263 template_path,
264 media_path,
265 archive,
266 scenarios,
267 method_defaults or {},
268 cameras,
269 )
270 self.unsubscribes: list = []
271 self.dupe_check_config: dict[str, Any] = dupe_check or {}
272 self.last_purge: dt.datetime | None = None
273 self.notification_cache: TTLCache = TTLCache(
274 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120)
275 )
277 async def initialize(self) -> None:
278 await self.context.initialize()
279 await self.context.register_delivery_methods(delivery_method_classes=METHODS)
281 self.expose_entities()
282 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
283 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME)
284 if housekeeping_schedule:
285 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule)
286 self.unsubscribes.append(
287 async_track_time_change(
288 self.hass,
289 self.async_nightly_tasks,
290 hour=housekeeping_schedule.hour,
291 minute=housekeeping_schedule.minute,
292 second=housekeeping_schedule.second,
293 )
294 )
296 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown))
298 async def async_shutdown(self, event: Event) -> None:
299 _LOGGER.info("SUPERNOTIFY shutting down, %s", event)
300 self.shutdown()
302 async def async_unregister_services(self) -> None:
303 _LOGGER.info("SUPERNOTIFY unregistering")
304 self.shutdown()
305 return await super().async_unregister_services()
307 def shutdown(self) -> None:
308 for unsub in self.unsubscribes:
309 if unsub:
310 try:
311 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub)
312 unsub()
313 except Exception as e:
314 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e)
315 _LOGGER.info("SUPERNOTIFY shut down")
317 def expose_entities(self) -> None:
318 for scenario in self.context.scenarios.values():
319 self.hass.states.async_set(
320 f"{DOMAIN}.scenario_{scenario.name}", STATE_UNKNOWN, scenario.attributes(include_condition=False)
321 )
322 for method in self.context.methods.values():
323 self.hass.states.async_set(
324 f"{DOMAIN}.method_{method.method}",
325 STATE_ON if len(method.valid_deliveries) > 0 else STATE_OFF,
326 method.attributes(),
327 )
328 for delivery_name, delivery in self.context._deliveries.items():
329 self.hass.states.async_set(
330 f"{DOMAIN}.delivery_{delivery_name}",
331 STATE_ON if str(delivery_name in self.context.deliveries) else STATE_OFF,
332 delivery,
333 )
335 def dupe_check(self, notification: Notification) -> bool:
336 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP)
337 if policy == ATTR_DUPE_POLICY_NONE:
338 return False
339 notification_hash = notification.hash()
340 if notification.priority in PRIORITY_VALUES:
341 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :]
342 else:
343 same_or_higher_priority = [notification.priority]
344 dupe = False
345 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority):
346 _LOGGER.debug("SUPERNOTIFY Detected dupe notification")
347 dupe = True
348 self.notification_cache[notification_hash, notification.priority] = notification.id
349 return dupe
351 async def async_send_message(
352 self, message: str = "", title: str | None = None, target: list | str | None = None, **kwargs
353 ) -> None:
354 """Send a message via chosen method."""
355 data = kwargs.get(ATTR_DATA, {})
356 notification = None
357 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data)
359 try:
360 notification = Notification(self.context, message, title, target, data)
361 await notification.initialize()
362 if self.dupe_check(notification):
363 notification.suppress()
364 else:
365 if await notification.deliver():
366 self.sent += 1
367 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent))
368 elif notification.errored:
369 _LOGGER.warning("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored)
370 else:
371 _LOGGER.info("SUPERNOTIFY No delivery selected for %s", notification.id)
373 except Exception as err:
374 # fault barrier of last resort, integration failures should be caught within envelope delivery
375 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True)
376 self.failures += 1
377 if notification is not None:
378 notification.delivery_error = format_exception(err)
379 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures))
381 if notification is not None:
382 self.last_notification = notification
383 self.context.archive.archive(notification)
385 _LOGGER.debug(
386 "SUPERNOTIFY %s deliveries, %s errors, %s skipped",
387 notification.delivered,
388 notification.errored,
389 notification.skipped,
390 )
392 def enquire_deliveries_by_scenario(self) -> dict[str, list[Scenario]]:
393 return self.context.delivery_by_scenario
395 async def enquire_occupancy(self) -> dict[str, list]:
396 return self.context.determine_occupancy()
398 async def enquire_active_scenarios(self) -> list[str]:
399 occupiers = self.context.determine_occupancy()
400 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
401 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)]
403 async def trace_active_scenarios(self) -> tuple:
404 occupiers = self.context.determine_occupancy()
405 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
407 def safe_json(v: Any) -> Any:
408 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder))
410 enabled = []
411 disabled = []
412 dcvars = asdict(cvars)
413 for s in self.context.scenarios.values():
414 if await s.trace(cvars):
415 enabled.append(safe_json(s.attributes(include_trace=True)))
416 else:
417 disabled.append(safe_json(s.attributes(include_trace=True)))
418 return enabled, disabled, dcvars
420 def enquire_scenarios(self) -> dict[str, dict]:
421 return {s.name: s.attributes(include_condition=False) for s in self.context.scenarios.values()}
423 def enquire_snoozes(self) -> list[dict[str, Any]]:
424 return self.context.snoozer.export()
426 def clear_snoozes(self) -> int:
427 return self.context.snoozer.clear()
429 def enquire_people(self) -> list[dict]:
430 return list(self.context.people.values())
432 @callback
433 def on_mobile_action(self, event: Event) -> None:
434 """Listen for mobile actions relevant to snooze and silence notifications
436 Example Action:
437 event_type: mobile_app_notification_action
438 data:
439 foo: a
440 origin: REMOTE
441 time_fired: "2024-04-20T13:14:09.360708+00:00"
442 context:
443 id: 01HVXT93JGWEDW0KE57Z0X6Z1K
444 parent_id: null
445 user_id: e9dbae1a5abf44dbbad52ff85501bb17
446 """
447 event_name = event.data.get(ATTR_ACTION)
448 if event_name is None or not event_name.startswith("SUPERNOTIFY_"):
449 return # event not intended for here
450 self.context.snoozer.handle_command_event(event, self.context.people)
452 @callback
453 async def async_nightly_tasks(self, now: dt.datetime) -> None:
454 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now)
455 await self.context.archive.cleanup()
456 self.context.snoozer.purge_snoozes()
457 _LOGGER.info("SUPERNOTIFY Housekeeping completed")