Coverage for custom_components/supernotify/notify.py: 87%
197 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
1"""Supernotify service, extending BaseNotificationService"""
3import datetime as dt
4import json
5import logging
6from dataclasses import asdict
7from traceback import format_exception
8from typing import Any
10from cachetools import TTLCache
11from homeassistant.components.notify.legacy import BaseNotificationService
12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP
13from homeassistant.core import Event, HomeAssistant, ServiceCall, SupportsResponse, callback
14from homeassistant.helpers.condition import async_validate_condition_config
15from homeassistant.helpers.event import async_track_time_change
16from homeassistant.helpers.json import ExtendedJSONEncoder
17from homeassistant.helpers.reload import async_setup_reload_service
18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL
21from custom_components.supernotify.scenario import Scenario
23from . import (
24 ATTR_ACTION,
25 ATTR_DATA,
26 ATTR_DUPE_POLICY_MTSLP,
27 ATTR_DUPE_POLICY_NONE,
28 CONF_ACTION_GROUPS,
29 CONF_ACTIONS,
30 CONF_ARCHIVE,
31 CONF_CAMERAS,
32 CONF_DELIVERY,
33 CONF_DUPE_CHECK,
34 CONF_DUPE_POLICY,
35 CONF_HOUSEKEEPING,
36 CONF_HOUSEKEEPING_TIME,
37 CONF_LINKS,
38 CONF_MEDIA_PATH,
39 CONF_METHODS,
40 CONF_RECIPIENTS,
41 CONF_SCENARIOS,
42 CONF_SIZE,
43 CONF_TEMPLATE_PATH,
44 CONF_TTL,
45 DOMAIN,
46 PLATFORM_SCHEMA,
47 PLATFORMS,
48 PRIORITY_MEDIUM,
49 PRIORITY_VALUES,
50 ConditionVariables,
51)
52from .configuration import SupernotificationConfiguration
53from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod
54from .methods.chime import ChimeDeliveryMethod
55from .methods.email import EmailDeliveryMethod
56from .methods.generic import GenericDeliveryMethod
57from .methods.media_player_image import MediaPlayerImageDeliveryMethod
58from .methods.mobile_push import MobilePushDeliveryMethod
59from .methods.persistent import PersistentDeliveryMethod
60from .methods.sms import SMSDeliveryMethod
61from .notification import Notification
63_LOGGER = logging.getLogger(__name__)
65SNOOZE_TIME = 60 * 60 # TODO: move to configuration
67METHODS: list[type] = [
68 EmailDeliveryMethod,
69 SMSDeliveryMethod,
70 AlexaMediaPlayerDeliveryMethod,
71 MobilePushDeliveryMethod,
72 MediaPlayerImageDeliveryMethod,
73 ChimeDeliveryMethod,
74 PersistentDeliveryMethod,
75 GenericDeliveryMethod,
76]
79async def async_get_service(
80 hass: HomeAssistant,
81 config: ConfigType,
82 discovery_info: DiscoveryInfoType | None = None,
83) -> "SuperNotificationAction":
84 """Notify specific component setup - see async_setup_legacy in BaseNotificationService"""
85 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection
86 _ = discovery_info
87 for delivery in config.get(CONF_DELIVERY, {}).values():
88 if delivery and CONF_CONDITION in delivery:
89 try:
90 await async_validate_condition_config(hass, delivery[CONF_CONDITION])
91 except Exception as e:
92 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e)
93 raise
95 hass.states.async_set(
96 f"{DOMAIN}.configured",
97 "True",
98 {
99 CONF_DELIVERY: config.get(CONF_DELIVERY, {}),
100 CONF_LINKS: config.get(CONF_LINKS, ()),
101 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH),
102 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH),
103 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}),
104 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()),
105 CONF_ACTIONS: config.get(CONF_ACTIONS, {}),
106 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()),
107 CONF_METHODS: config.get(CONF_METHODS, {}),
108 CONF_CAMERAS: config.get(CONF_CAMERAS, {}),
109 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}),
110 },
111 )
112 hass.states.async_set(f"{DOMAIN}.failures", "0")
113 hass.states.async_set(f"{DOMAIN}.sent", "0")
115 await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
116 service = SuperNotificationAction(
117 hass,
118 deliveries=config[CONF_DELIVERY],
119 template_path=config[CONF_TEMPLATE_PATH],
120 media_path=config[CONF_MEDIA_PATH],
121 archive=config[CONF_ARCHIVE],
122 housekeeping=config[CONF_HOUSEKEEPING],
123 recipients=config[CONF_RECIPIENTS],
124 mobile_actions=config[CONF_ACTION_GROUPS],
125 scenarios=config[CONF_SCENARIOS],
126 links=config[CONF_LINKS],
127 method_defaults=config[CONF_METHODS],
128 cameras=config[CONF_CAMERAS],
129 dupe_check=config[CONF_DUPE_CHECK],
130 )
131 await service.initialize()
133 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict:
134 return service.enquire_deliveries_by_scenario()
136 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict:
137 return service.last_notification.contents() if service.last_notification else {}
139 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict:
140 trace = call.data.get("trace", False)
141 return {"scenarios": await service.enquire_active_scenarios(trace)}
143 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict:
144 return {"scenarios": await service.enquire_occupancy()}
146 async def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict:
147 return {"snoozes": service.enquire_snoozes()}
149 async def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict:
150 return {"cleared": service.clear_snoozes()}
152 async def supplemental_action_enquire_people(_call: ServiceCall) -> dict:
153 return {"people": service.enquire_people()}
155 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]:
156 days = call.data.get("days")
157 if service.context.archive is None:
158 return {"error": "No archive configured"}
159 purged = await service.context.archive.cleanup(days=days, force=True)
160 arch_size = await service.context.archive.size()
161 return {
162 "purged": purged,
163 "remaining": arch_size,
164 "interval": ARCHIVE_PURGE_MIN_INTERVAL,
165 "days": service.context.archive.archive_days if days is None else days,
166 }
168 hass.services.async_register(
169 DOMAIN,
170 "enquire_deliveries_by_scenario",
171 supplemental_action_enquire_deliveries_by_scenario,
172 supports_response=SupportsResponse.ONLY,
173 )
174 hass.services.async_register(
175 DOMAIN,
176 "enquire_last_notification",
177 supplemental_action_enquire_last_notification,
178 supports_response=SupportsResponse.ONLY,
179 )
180 hass.services.async_register(
181 DOMAIN,
182 "enquire_active_scenarios",
183 supplemental_action_enquire_active_scenarios,
184 supports_response=SupportsResponse.ONLY,
185 )
186 hass.services.async_register(
187 DOMAIN,
188 "enquire_occupancy",
189 supplemental_action_enquire_occupancy,
190 supports_response=SupportsResponse.ONLY,
191 )
192 hass.services.async_register(
193 DOMAIN,
194 "enquire_people",
195 supplemental_action_enquire_people,
196 supports_response=SupportsResponse.ONLY,
197 )
198 hass.services.async_register(
199 DOMAIN,
200 "enquire_snoozes",
201 supplemental_action_enquire_snoozes,
202 supports_response=SupportsResponse.ONLY,
203 )
204 hass.services.async_register(
205 DOMAIN,
206 "clear_snoozes",
207 supplemental_action_clear_snoozes,
208 supports_response=SupportsResponse.ONLY,
209 )
210 hass.services.async_register(
211 DOMAIN,
212 "purge_archive",
213 supplemental_action_purge_archive,
214 supports_response=SupportsResponse.ONLY,
215 )
217 return service
220class SuperNotificationAction(BaseNotificationService):
221 """Implement SuperNotification action."""
223 def __init__(
224 self,
225 hass: HomeAssistant,
226 deliveries: dict[str, dict] | None = None,
227 template_path: str | None = None,
228 media_path: str | None = None,
229 archive: dict | None = None,
230 housekeeping: dict | None = None,
231 recipients: list | None = None,
232 mobile_actions: dict | None = None,
233 scenarios: dict[str, dict] | None = None,
234 links: list | None = None,
235 method_defaults: dict | None = None,
236 cameras: list[dict] | None = None,
237 dupe_check: dict | None = None,
238 ) -> None:
239 """Initialize the service."""
240 self.hass: HomeAssistant = hass
241 self.last_notification: Notification | None = None
242 self.failures: int = 0
243 self.housekeeping: dict = housekeeping or {}
244 self.sent: int = 0
245 self.context = SupernotificationConfiguration(
246 hass,
247 deliveries,
248 links or [],
249 recipients or [],
250 mobile_actions,
251 template_path,
252 media_path,
253 archive,
254 scenarios,
255 method_defaults or {},
256 cameras,
257 )
258 self.unsubscribes: list = []
259 self.dupe_check_config: dict[str, Any] = dupe_check or {}
260 self.last_purge: dt.datetime | None = None
261 self.notification_cache: TTLCache = TTLCache(
262 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120)
263 )
265 async def initialize(self) -> None:
266 await self.context.initialize()
267 await self.context.register_delivery_methods(delivery_method_classes=METHODS)
269 self.expose_entities()
270 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action))
271 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME)
272 if housekeeping_schedule:
273 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule)
274 self.unsubscribes.append(
275 async_track_time_change(
276 self.hass,
277 self.async_nightly_tasks,
278 hour=housekeeping_schedule.hour,
279 minute=housekeeping_schedule.minute,
280 second=housekeeping_schedule.second,
281 )
282 )
284 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown))
286 async def async_shutdown(self, event: Event) -> None:
287 _LOGGER.info("SUPERNOTIFY shutting down, %s", event)
288 self.shutdown()
290 async def async_unregister_services(self) -> None:
291 _LOGGER.info("SUPERNOTIFY unregistering")
292 self.shutdown()
293 return await super().async_unregister_services()
295 def shutdown(self) -> None:
296 for unsub in self.unsubscribes:
297 if unsub:
298 try:
299 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub)
300 unsub()
301 except Exception as e:
302 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e)
303 _LOGGER.info("SUPERNOTIFY shut down")
305 def expose_entities(self) -> None:
306 for scenario in self.context.scenarios.values():
307 self.hass.states.async_set(f"{DOMAIN}.scenario_{scenario.name}", "", scenario.attributes(include_condition=False))
308 for method in self.context.methods.values():
309 self.hass.states.async_set(
310 f"{DOMAIN}.method_{method.method}", str(len(method.valid_deliveries) > 0), method.attributes()
311 )
312 for delivery_name, delivery in self.context._deliveries.items():
313 self.hass.states.async_set(
314 f"{DOMAIN}.delivery_{delivery_name}", str(delivery_name in self.context.deliveries), delivery
315 )
317 def dupe_check(self, notification: Notification) -> bool:
318 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP)
319 if policy == ATTR_DUPE_POLICY_NONE:
320 return False
321 notification_hash = notification.hash()
322 if notification.priority in PRIORITY_VALUES:
323 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :]
324 else:
325 same_or_higher_priority = [notification.priority]
326 dupe = False
327 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority):
328 _LOGGER.debug("SUPERNOTIFY Detected dupe notification")
329 dupe = True
330 self.notification_cache[(notification_hash, notification.priority)] = notification.id
331 return dupe
333 async def async_send_message(
334 self, message: str = "", title: str | None = None, target: list | str | None = None, **kwargs
335 ) -> None:
336 """Send a message via chosen method."""
337 data = kwargs.get(ATTR_DATA, {})
338 notification = None
339 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data)
341 try:
342 notification = Notification(self.context, message, title, target, data)
343 await notification.initialize()
344 if self.dupe_check(notification):
345 notification.suppress()
346 else:
347 if await notification.deliver():
348 self.sent += 1
349 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent))
350 elif notification.errored:
351 _LOGGER.warning("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored)
352 else:
353 _LOGGER.info("SUPERNOTIFY No delivery selected for %s", notification.id)
355 except Exception as err:
356 # fault barrier of last resort, integration failures should be caught within envelope delivery
357 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True)
358 self.failures += 1
359 if notification is not None:
360 notification.delivery_error = format_exception(err)
361 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures))
363 if notification is not None:
364 self.last_notification = notification
365 self.context.archive.archive(notification)
367 _LOGGER.debug(
368 "SUPERNOTIFY %s deliveries, %s errors, %s skipped",
369 notification.delivered,
370 notification.errored,
371 notification.skipped,
372 )
374 def enquire_deliveries_by_scenario(self) -> dict[str, list[Scenario]]:
375 return self.context.delivery_by_scenario
377 async def enquire_occupancy(self) -> dict[str, list]:
378 return self.context.determine_occupancy()
380 async def enquire_active_scenarios(self, trace: bool) -> list[str] | dict:
381 occupiers = self.context.determine_occupancy()
382 cvars = ConditionVariables([], [], PRIORITY_MEDIUM, occupiers, None, None)
384 def safe_json(v: Any) -> Any:
385 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder))
387 if trace:
388 results = {"ENABLED": [], "DISABLED": [], "VARS": asdict(cvars)}
389 for s in self.context.scenarios.values():
390 if await s.trace(cvars):
391 results["ENABLED"].append(safe_json(s.attributes(include_trace=True))) # type: ignore
392 else:
393 results["DISABLED"].append(safe_json(s.attributes(include_trace=True))) # type: ignore
394 return results
396 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)]
398 def enquire_snoozes(self) -> list[dict[str, Any]]:
399 return self.context.snoozer.export()
401 def clear_snoozes(self) -> int:
402 return self.context.snoozer.clear()
404 def enquire_people(self) -> list[dict]:
405 return list(self.context.people.values())
407 @callback
408 def on_mobile_action(self, event: Event) -> None:
409 """Listen for mobile actions relevant to snooze and silence notifications
411 Example Action:
412 event_type: mobile_app_notification_action
413 data:
414 foo: a
415 origin: REMOTE
416 time_fired: "2024-04-20T13:14:09.360708+00:00"
417 context:
418 id: 01HVXT93JGWEDW0KE57Z0X6Z1K
419 parent_id: null
420 user_id: e9dbae1a5abf44dbbad52ff85501bb17
421 """
422 event_name = event.data.get(ATTR_ACTION)
423 if event_name is None or not event_name.startswith("SUPERNOTIFY_"):
424 return # event not intended for here
425 self.context.snoozer.handle_command_event(event, self.context.people)
427 @callback
428 async def async_nightly_tasks(self, now: dt.datetime) -> None:
429 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now)
430 await self.context.archive.cleanup()
431 self.context.snoozer.purge_snoozes()
432 _LOGGER.info("SUPERNOTIFY Housekeeping completed")