Coverage for custom_components/supernotify/notify.py: 87%

209 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-07-19 20:50 +0000

1"""Supernotify service, extending BaseNotificationService""" 

2 

3import datetime as dt 

4import json 

5import logging 

6from dataclasses import asdict 

7from traceback import format_exception 

8from typing import Any 

9 

10from cachetools import TTLCache 

11from homeassistant.components.notify.legacy import BaseNotificationService 

12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN 

13from homeassistant.core import Event, HomeAssistant, ServiceCall, SupportsResponse, callback 

14from homeassistant.helpers.condition import async_validate_condition_config 

15from homeassistant.helpers.event import async_track_time_change 

16from homeassistant.helpers.json import ExtendedJSONEncoder 

17from homeassistant.helpers.reload import async_setup_reload_service 

18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType 

19 

20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL 

21from custom_components.supernotify.scenario import Scenario 

22 

23from . import ( 

24 ATTR_ACTION, 

25 ATTR_DATA, 

26 ATTR_DUPE_POLICY_MTSLP, 

27 ATTR_DUPE_POLICY_NONE, 

28 CONF_ACTION_GROUPS, 

29 CONF_ACTIONS, 

30 CONF_ARCHIVE, 

31 CONF_CAMERAS, 

32 CONF_DELIVERY, 

33 CONF_DUPE_CHECK, 

34 CONF_DUPE_POLICY, 

35 CONF_HOUSEKEEPING, 

36 CONF_HOUSEKEEPING_TIME, 

37 CONF_LINKS, 

38 CONF_MEDIA_PATH, 

39 CONF_METHODS, 

40 CONF_RECIPIENTS, 

41 CONF_SCENARIOS, 

42 CONF_SIZE, 

43 CONF_TEMPLATE_PATH, 

44 CONF_TTL, 

45 DOMAIN, 

46 PLATFORM_SCHEMA, 

47 PLATFORMS, 

48 PRIORITY_MEDIUM, 

49 PRIORITY_VALUES, 

50 ConditionVariables, 

51) 

52from .configuration import Context 

53from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod 

54from .methods.chime import ChimeDeliveryMethod 

55from .methods.email import EmailDeliveryMethod 

56from .methods.generic import GenericDeliveryMethod 

57from .methods.media_player_image import MediaPlayerImageDeliveryMethod 

58from .methods.mobile_push import MobilePushDeliveryMethod 

59from .methods.persistent import PersistentDeliveryMethod 

60from .methods.sms import SMSDeliveryMethod 

61from .notification import Notification 

62 

63_LOGGER = logging.getLogger(__name__) 

64 

65SNOOZE_TIME = 60 * 60 # TODO: move to configuration 

66 

67METHODS: list[type] = [ 

68 EmailDeliveryMethod, 

69 SMSDeliveryMethod, 

70 AlexaMediaPlayerDeliveryMethod, 

71 MobilePushDeliveryMethod, 

72 MediaPlayerImageDeliveryMethod, 

73 ChimeDeliveryMethod, 

74 PersistentDeliveryMethod, 

75 GenericDeliveryMethod, 

76] 

77 

78 

79async def async_get_service( 

80 hass: HomeAssistant, 

81 config: ConfigType, 

82 discovery_info: DiscoveryInfoType | None = None, 

83) -> "SuperNotificationAction": 

84 """Notify specific component setup - see async_setup_legacy in BaseNotificationService""" 

85 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection 

86 _ = discovery_info 

87 for delivery in config.get(CONF_DELIVERY, {}).values(): 

88 if delivery and CONF_CONDITION in delivery: 

89 try: 

90 await async_validate_condition_config(hass, delivery[CONF_CONDITION]) 

91 except Exception as e: 

92 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e) 

93 raise 

94 

95 hass.states.async_set( 

96 f"{DOMAIN}.configured", 

97 "True", 

98 { 

99 CONF_DELIVERY: config.get(CONF_DELIVERY, {}), 

100 CONF_LINKS: config.get(CONF_LINKS, ()), 

101 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH), 

102 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH), 

103 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}), 

104 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()), 

105 CONF_ACTIONS: config.get(CONF_ACTIONS, {}), 

106 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()), 

107 CONF_METHODS: config.get(CONF_METHODS, {}), 

108 CONF_CAMERAS: config.get(CONF_CAMERAS, {}), 

109 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}), 

110 }, 

111 ) 

112 hass.states.async_set(f"{DOMAIN}.failures", "0") 

113 hass.states.async_set(f"{DOMAIN}.sent", "0") 

114 

115 await async_setup_reload_service(hass, DOMAIN, PLATFORMS) 

116 service = SuperNotificationAction( 

117 hass, 

118 deliveries=config[CONF_DELIVERY], 

119 template_path=config[CONF_TEMPLATE_PATH], 

120 media_path=config[CONF_MEDIA_PATH], 

121 archive=config[CONF_ARCHIVE], 

122 housekeeping=config[CONF_HOUSEKEEPING], 

123 recipients=config[CONF_RECIPIENTS], 

124 mobile_actions=config[CONF_ACTION_GROUPS], 

125 scenarios=config[CONF_SCENARIOS], 

126 links=config[CONF_LINKS], 

127 method_defaults=config[CONF_METHODS], 

128 cameras=config[CONF_CAMERAS], 

129 dupe_check=config[CONF_DUPE_CHECK], 

130 ) 

131 await service.initialize() 

132 

133 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict: 

134 return service.enquire_deliveries_by_scenario() 

135 

136 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict: 

137 return service.last_notification.contents() if service.last_notification else {} 

138 

139 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict: 

140 trace = call.data.get("trace", False) 

141 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()} 

142 if trace: 

143 result["trace"] = await service.trace_active_scenarios() 

144 return result 

145 

146 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict: 

147 return {"scenarios": service.enquire_scenarios()} 

148 

149 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict: 

150 return {"scenarios": await service.enquire_occupancy()} 

151 

152 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict: 

153 return {"snoozes": service.enquire_snoozes()} 

154 

155 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict: 

156 return {"cleared": service.clear_snoozes()} 

157 

158 def supplemental_action_enquire_people(_call: ServiceCall) -> dict: 

159 return {"people": service.enquire_people()} 

160 

161 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]: 

162 days = call.data.get("days") 

163 if service.context.archive is None: 

164 return {"error": "No archive configured"} 

165 purged = await service.context.archive.cleanup(days=days, force=True) 

166 arch_size = await service.context.archive.size() 

167 return { 

168 "purged": purged, 

169 "remaining": arch_size, 

170 "interval": ARCHIVE_PURGE_MIN_INTERVAL, 

171 "days": service.context.archive.archive_days if days is None else days, 

172 } 

173 

174 hass.services.async_register( 

175 DOMAIN, 

176 "enquire_deliveries_by_scenario", 

177 supplemental_action_enquire_deliveries_by_scenario, 

178 supports_response=SupportsResponse.ONLY, 

179 ) 

180 hass.services.async_register( 

181 DOMAIN, 

182 "enquire_last_notification", 

183 supplemental_action_enquire_last_notification, 

184 supports_response=SupportsResponse.ONLY, 

185 ) 

186 hass.services.async_register( 

187 DOMAIN, 

188 "enquire_active_scenarios", 

189 supplemental_action_enquire_active_scenarios, 

190 supports_response=SupportsResponse.ONLY, 

191 ) 

192 hass.services.async_register( 

193 DOMAIN, 

194 "enquire_scenarios", 

195 supplemental_action_enquire_scenarios, 

196 supports_response=SupportsResponse.ONLY, 

197 ) 

198 hass.services.async_register( 

199 DOMAIN, 

200 "enquire_occupancy", 

201 supplemental_action_enquire_occupancy, 

202 supports_response=SupportsResponse.ONLY, 

203 ) 

204 hass.services.async_register( 

205 DOMAIN, 

206 "enquire_people", 

207 supplemental_action_enquire_people, 

208 supports_response=SupportsResponse.ONLY, 

209 ) 

210 hass.services.async_register( 

211 DOMAIN, 

212 "enquire_snoozes", 

213 supplemental_action_enquire_snoozes, 

214 supports_response=SupportsResponse.ONLY, 

215 ) 

216 hass.services.async_register( 

217 DOMAIN, 

218 "clear_snoozes", 

219 supplemental_action_clear_snoozes, 

220 supports_response=SupportsResponse.ONLY, 

221 ) 

222 hass.services.async_register( 

223 DOMAIN, 

224 "purge_archive", 

225 supplemental_action_purge_archive, 

226 supports_response=SupportsResponse.ONLY, 

227 ) 

228 

229 return service 

230 

231 

232class SuperNotificationAction(BaseNotificationService): 

233 """Implement SuperNotification action.""" 

234 

235 def __init__( 

236 self, 

237 hass: HomeAssistant, 

238 deliveries: dict[str, dict] | None = None, 

239 template_path: str | None = None, 

240 media_path: str | None = None, 

241 archive: dict | None = None, 

242 housekeeping: dict | None = None, 

243 recipients: list | None = None, 

244 mobile_actions: dict | None = None, 

245 scenarios: dict[str, dict] | None = None, 

246 links: list | None = None, 

247 method_defaults: dict | None = None, 

248 cameras: list[dict] | None = None, 

249 dupe_check: dict | None = None, 

250 ) -> None: 

251 """Initialize the service.""" 

252 self.hass: HomeAssistant = hass 

253 self.last_notification: Notification | None = None 

254 self.failures: int = 0 

255 self.housekeeping: dict = housekeeping or {} 

256 self.sent: int = 0 

257 self.context = Context( 

258 hass, 

259 deliveries, 

260 links or [], 

261 recipients or [], 

262 mobile_actions, 

263 template_path, 

264 media_path, 

265 archive, 

266 scenarios, 

267 method_defaults or {}, 

268 cameras, 

269 ) 

270 self.unsubscribes: list = [] 

271 self.dupe_check_config: dict[str, Any] = dupe_check or {} 

272 self.last_purge: dt.datetime | None = None 

273 self.notification_cache: TTLCache = TTLCache( 

274 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120) 

275 ) 

276 

277 async def initialize(self) -> None: 

278 await self.context.initialize() 

279 await self.context.register_delivery_methods(delivery_method_classes=METHODS) 

280 

281 self.expose_entities() 

282 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

283 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME) 

284 if housekeeping_schedule: 

285 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule) 

286 self.unsubscribes.append( 

287 async_track_time_change( 

288 self.hass, 

289 self.async_nightly_tasks, 

290 hour=housekeeping_schedule.hour, 

291 minute=housekeeping_schedule.minute, 

292 second=housekeeping_schedule.second, 

293 ) 

294 ) 

295 

296 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown)) 

297 

298 async def async_shutdown(self, event: Event) -> None: 

299 _LOGGER.info("SUPERNOTIFY shutting down, %s", event) 

300 self.shutdown() 

301 

302 async def async_unregister_services(self) -> None: 

303 _LOGGER.info("SUPERNOTIFY unregistering") 

304 self.shutdown() 

305 return await super().async_unregister_services() 

306 

307 def shutdown(self) -> None: 

308 for unsub in self.unsubscribes: 

309 if unsub: 

310 try: 

311 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub) 

312 unsub() 

313 except Exception as e: 

314 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e) 

315 _LOGGER.info("SUPERNOTIFY shut down") 

316 

317 def expose_entities(self) -> None: 

318 for scenario in self.context.scenarios.values(): 

319 self.hass.states.async_set( 

320 f"{DOMAIN}.scenario_{scenario.name}", STATE_UNKNOWN, scenario.attributes(include_condition=False) 

321 ) 

322 for method in self.context.methods.values(): 

323 self.hass.states.async_set( 

324 f"{DOMAIN}.method_{method.method}", 

325 STATE_ON if len(method.valid_deliveries) > 0 else STATE_OFF, 

326 method.attributes(), 

327 ) 

328 for delivery_name, delivery in self.context._deliveries.items(): 

329 self.hass.states.async_set( 

330 f"{DOMAIN}.delivery_{delivery_name}", 

331 STATE_ON if str(delivery_name in self.context.deliveries) else STATE_OFF, 

332 delivery, 

333 ) 

334 

335 def dupe_check(self, notification: Notification) -> bool: 

336 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP) 

337 if policy == ATTR_DUPE_POLICY_NONE: 

338 return False 

339 notification_hash = notification.hash() 

340 if notification.priority in PRIORITY_VALUES: 

341 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :] 

342 else: 

343 same_or_higher_priority = [notification.priority] 

344 dupe = False 

345 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority): 

346 _LOGGER.debug("SUPERNOTIFY Detected dupe notification") 

347 dupe = True 

348 self.notification_cache[notification_hash, notification.priority] = notification.id 

349 return dupe 

350 

351 async def async_send_message( 

352 self, message: str = "", title: str | None = None, target: list | str | None = None, **kwargs 

353 ) -> None: 

354 """Send a message via chosen method.""" 

355 data = kwargs.get(ATTR_DATA, {}) 

356 notification = None 

357 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data) 

358 

359 try: 

360 notification = Notification(self.context, message, title, target, data) 

361 await notification.initialize() 

362 if self.dupe_check(notification): 

363 notification.suppress() 

364 else: 

365 if await notification.deliver(): 

366 self.sent += 1 

367 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent)) 

368 elif notification.errored: 

369 _LOGGER.warning("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored) 

370 else: 

371 _LOGGER.info("SUPERNOTIFY No delivery selected for %s", notification.id) 

372 

373 except Exception as err: 

374 # fault barrier of last resort, integration failures should be caught within envelope delivery 

375 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True) 

376 self.failures += 1 

377 if notification is not None: 

378 notification.delivery_error = format_exception(err) 

379 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures)) 

380 

381 if notification is not None: 

382 self.last_notification = notification 

383 self.context.archive.archive(notification) 

384 

385 _LOGGER.debug( 

386 "SUPERNOTIFY %s deliveries, %s errors, %s skipped", 

387 notification.delivered, 

388 notification.errored, 

389 notification.skipped, 

390 ) 

391 

392 def enquire_deliveries_by_scenario(self) -> dict[str, list[Scenario]]: 

393 return self.context.delivery_by_scenario 

394 

395 async def enquire_occupancy(self) -> dict[str, list]: 

396 return self.context.determine_occupancy() 

397 

398 async def enquire_active_scenarios(self) -> list[str]: 

399 occupiers = self.context.determine_occupancy() 

400 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None) 

401 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)] 

402 

403 async def trace_active_scenarios(self) -> tuple: 

404 occupiers = self.context.determine_occupancy() 

405 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None) 

406 

407 def safe_json(v: Any) -> Any: 

408 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder)) 

409 

410 enabled = [] 

411 disabled = [] 

412 dcvars = asdict(cvars) 

413 for s in self.context.scenarios.values(): 

414 if await s.trace(cvars): 

415 enabled.append(safe_json(s.attributes(include_trace=True))) 

416 else: 

417 disabled.append(safe_json(s.attributes(include_trace=True))) 

418 return enabled, disabled, dcvars 

419 

420 def enquire_scenarios(self) -> dict[str, dict]: 

421 return {s.name: s.attributes(include_condition=False) for s in self.context.scenarios.values()} 

422 

423 def enquire_snoozes(self) -> list[dict[str, Any]]: 

424 return self.context.snoozer.export() 

425 

426 def clear_snoozes(self) -> int: 

427 return self.context.snoozer.clear() 

428 

429 def enquire_people(self) -> list[dict]: 

430 return list(self.context.people.values()) 

431 

432 @callback 

433 def on_mobile_action(self, event: Event) -> None: 

434 """Listen for mobile actions relevant to snooze and silence notifications 

435 

436 Example Action: 

437 event_type: mobile_app_notification_action 

438 data: 

439 foo: a 

440 origin: REMOTE 

441 time_fired: "2024-04-20T13:14:09.360708+00:00" 

442 context: 

443 id: 01HVXT93JGWEDW0KE57Z0X6Z1K 

444 parent_id: null 

445 user_id: e9dbae1a5abf44dbbad52ff85501bb17 

446 """ 

447 event_name = event.data.get(ATTR_ACTION) 

448 if event_name is None or not event_name.startswith("SUPERNOTIFY_"): 

449 return # event not intended for here 

450 self.context.snoozer.handle_command_event(event, self.context.people) 

451 

452 @callback 

453 async def async_nightly_tasks(self, now: dt.datetime) -> None: 

454 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now) 

455 await self.context.archive.cleanup() 

456 self.context.snoozer.purge_snoozes() 

457 _LOGGER.info("SUPERNOTIFY Housekeeping completed")