Coverage for custom_components/supernotify/notify.py: 86%

214 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-26 08:54 +0000

1"""Supernotify service, extending BaseNotificationService""" 

2 

3import datetime as dt 

4import json 

5import logging 

6from dataclasses import asdict 

7from traceback import format_exception 

8from typing import Any 

9 

10from cachetools import TTLCache 

11from homeassistant.components.notify.legacy import BaseNotificationService 

12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN 

13from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, ServiceCall, SupportsResponse, callback 

14from homeassistant.helpers.condition import async_validate_condition_config 

15from homeassistant.helpers.event import async_track_time_change 

16from homeassistant.helpers.json import ExtendedJSONEncoder 

17from homeassistant.helpers.reload import async_setup_reload_service 

18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType 

19 

20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL 

21from custom_components.supernotify.delivery_method import DeliveryMethod 

22 

23from . import ( 

24 ATTR_ACTION, 

25 ATTR_DATA, 

26 ATTR_DUPE_POLICY_MTSLP, 

27 ATTR_DUPE_POLICY_NONE, 

28 CONF_ACTION_GROUPS, 

29 CONF_ACTIONS, 

30 CONF_ARCHIVE, 

31 CONF_CAMERAS, 

32 CONF_DELIVERY, 

33 CONF_DUPE_CHECK, 

34 CONF_DUPE_POLICY, 

35 CONF_HOUSEKEEPING, 

36 CONF_HOUSEKEEPING_TIME, 

37 CONF_LINKS, 

38 CONF_MEDIA_PATH, 

39 CONF_METHODS, 

40 CONF_RECIPIENTS, 

41 CONF_SCENARIOS, 

42 CONF_SIZE, 

43 CONF_TEMPLATE_PATH, 

44 CONF_TTL, 

45 DOMAIN, 

46 PLATFORMS, 

47 PRIORITY_MEDIUM, 

48 PRIORITY_VALUES, 

49 ConditionVariables, 

50) 

51from . import SUPERNOTIFY_SCHEMA as PLATFORM_SCHEMA 

52from .configuration import Context 

53from .methods.alexa_devices import AlexaDevicesDeliveryMethod 

54from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod 

55from .methods.chime import ChimeDeliveryMethod 

56from .methods.email import EmailDeliveryMethod 

57from .methods.generic import GenericDeliveryMethod 

58from .methods.media_player_image import MediaPlayerImageDeliveryMethod 

59from .methods.mobile_push import MobilePushDeliveryMethod 

60from .methods.persistent import PersistentDeliveryMethod 

61from .methods.sms import SMSDeliveryMethod 

62from .notification import Notification 

63 

64_LOGGER = logging.getLogger(__name__) 

65 

66SNOOZE_TIME = 60 * 60 # TODO: move to configuration 

67 

68METHODS: list[type[DeliveryMethod]] = [ 

69 EmailDeliveryMethod, 

70 SMSDeliveryMethod, 

71 AlexaDevicesDeliveryMethod, 

72 AlexaMediaPlayerDeliveryMethod, 

73 MobilePushDeliveryMethod, 

74 MediaPlayerImageDeliveryMethod, 

75 ChimeDeliveryMethod, 

76 PersistentDeliveryMethod, 

77 GenericDeliveryMethod, 

78] # No auto-discovery of method plugins so manual class registration required here 

79 

80 

81async def async_get_service( 

82 hass: HomeAssistant, 

83 config: ConfigType, 

84 discovery_info: DiscoveryInfoType | None = None, 

85) -> "SuperNotificationAction": 

86 """Notify specific component setup - see async_setup_legacy in BaseNotificationService""" 

87 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection 

88 _ = discovery_info 

89 for delivery in config.get(CONF_DELIVERY, {}).values(): 

90 if delivery and CONF_CONDITION in delivery: 

91 try: 

92 await async_validate_condition_config(hass, delivery[CONF_CONDITION]) 

93 except Exception as e: 

94 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e) 

95 raise 

96 

97 hass.states.async_set( 

98 f"{DOMAIN}.configured", 

99 "True", 

100 { 

101 CONF_DELIVERY: config.get(CONF_DELIVERY, {}), 

102 CONF_LINKS: config.get(CONF_LINKS, ()), 

103 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH, None), 

104 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH, None), 

105 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}), 

106 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()), 

107 CONF_ACTIONS: config.get(CONF_ACTIONS, {}), 

108 CONF_HOUSEKEEPING: config.get(CONF_HOUSEKEEPING, {}), 

109 CONF_ACTION_GROUPS: config.get(CONF_ACTION_GROUPS, {}), 

110 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()), 

111 CONF_METHODS: config.get(CONF_METHODS, {}), 

112 CONF_CAMERAS: config.get(CONF_CAMERAS, {}), 

113 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}), 

114 }, 

115 ) 

116 hass.states.async_set(f"{DOMAIN}.failures", "0") 

117 hass.states.async_set(f"{DOMAIN}.sent", "0") 

118 

119 await async_setup_reload_service(hass, DOMAIN, PLATFORMS) 

120 service = SuperNotificationAction( 

121 hass, 

122 deliveries=config[CONF_DELIVERY], 

123 template_path=config[CONF_TEMPLATE_PATH], 

124 media_path=config[CONF_MEDIA_PATH], 

125 archive=config[CONF_ARCHIVE], 

126 housekeeping=config[CONF_HOUSEKEEPING], 

127 recipients=config[CONF_RECIPIENTS], 

128 mobile_actions=config[CONF_ACTION_GROUPS], 

129 scenarios=config[CONF_SCENARIOS], 

130 links=config[CONF_LINKS], 

131 method_configs=config[CONF_METHODS], 

132 cameras=config[CONF_CAMERAS], 

133 dupe_check=config[CONF_DUPE_CHECK], 

134 ) 

135 await service.initialize() 

136 

137 def supplemental_action_refresh_entities(_call: ServiceCall) -> None: 

138 return service.expose_entities() 

139 

140 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict[str, Any]: 

141 return service.enquire_deliveries_by_scenario() 

142 

143 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict[str, Any]: 

144 return service.last_notification.contents() if service.last_notification else {} 

145 

146 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict[str, Any]: 

147 trace = call.data.get("trace", False) 

148 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()} 

149 if trace: 

150 result["trace"] = await service.trace_active_scenarios() 

151 return result 

152 

153 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict[str, Any]: 

154 return {"scenarios": service.enquire_scenarios()} 

155 

156 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict[str, Any]: 

157 return {"scenarios": await service.enquire_occupancy()} 

158 

159 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict[str, Any]: 

160 return {"snoozes": service.enquire_snoozes()} 

161 

162 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict[str, Any]: 

163 return {"cleared": service.clear_snoozes()} 

164 

165 def supplemental_action_enquire_people(_call: ServiceCall) -> dict[str, Any]: 

166 return {"people": service.enquire_people()} 

167 

168 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]: 

169 days = call.data.get("days") 

170 if not service.context.archive.enabled: 

171 return {"error": "No archive configured"} 

172 purged = await service.context.archive.cleanup(days=days, force=True) 

173 arch_size = await service.context.archive.size() 

174 return { 

175 "purged": purged, 

176 "remaining": arch_size, 

177 "interval": ARCHIVE_PURGE_MIN_INTERVAL, 

178 "days": service.context.archive.archive_days if days is None else days, 

179 } 

180 

181 hass.services.async_register( 

182 DOMAIN, 

183 "enquire_deliveries_by_scenario", 

184 supplemental_action_enquire_deliveries_by_scenario, 

185 supports_response=SupportsResponse.ONLY, 

186 ) 

187 hass.services.async_register( 

188 DOMAIN, 

189 "enquire_last_notification", 

190 supplemental_action_enquire_last_notification, 

191 supports_response=SupportsResponse.ONLY, 

192 ) 

193 hass.services.async_register( 

194 DOMAIN, 

195 "enquire_active_scenarios", 

196 supplemental_action_enquire_active_scenarios, 

197 supports_response=SupportsResponse.ONLY, 

198 ) 

199 hass.services.async_register( 

200 DOMAIN, 

201 "enquire_scenarios", 

202 supplemental_action_enquire_scenarios, 

203 supports_response=SupportsResponse.ONLY, 

204 ) 

205 hass.services.async_register( 

206 DOMAIN, 

207 "enquire_occupancy", 

208 supplemental_action_enquire_occupancy, 

209 supports_response=SupportsResponse.ONLY, 

210 ) 

211 hass.services.async_register( 

212 DOMAIN, 

213 "enquire_people", 

214 supplemental_action_enquire_people, 

215 supports_response=SupportsResponse.ONLY, 

216 ) 

217 hass.services.async_register( 

218 DOMAIN, 

219 "enquire_snoozes", 

220 supplemental_action_enquire_snoozes, 

221 supports_response=SupportsResponse.ONLY, 

222 ) 

223 hass.services.async_register( 

224 DOMAIN, 

225 "clear_snoozes", 

226 supplemental_action_clear_snoozes, 

227 supports_response=SupportsResponse.ONLY, 

228 ) 

229 hass.services.async_register( 

230 DOMAIN, 

231 "purge_archive", 

232 supplemental_action_purge_archive, 

233 supports_response=SupportsResponse.ONLY, 

234 ) 

235 hass.services.async_register( 

236 DOMAIN, 

237 "refresh_entities", 

238 supplemental_action_refresh_entities, 

239 supports_response=SupportsResponse.NONE, 

240 ) 

241 

242 return service 

243 

244 

245class SuperNotificationAction(BaseNotificationService): 

246 """Implement SuperNotification action.""" 

247 

248 def __init__( 

249 self, 

250 hass: HomeAssistant, 

251 deliveries: dict[str, dict[str, Any]] | None = None, 

252 template_path: str | None = None, 

253 media_path: str | None = None, 

254 archive: dict[str, Any] | None = None, 

255 housekeeping: dict[str, Any] | None = None, 

256 recipients: list[dict[str, Any]] | None = None, 

257 mobile_actions: dict[str, Any] | None = None, 

258 scenarios: dict[str, dict[str, Any]] | None = None, 

259 links: list[str] | None = None, 

260 method_configs: dict[str, Any] | None = None, 

261 cameras: list[dict[str, Any]] | None = None, 

262 dupe_check: dict[str, Any] | None = None, 

263 ) -> None: 

264 """Initialize the service.""" 

265 self.hass: HomeAssistant = hass 

266 self.last_notification: Notification | None = None 

267 self.failures: int = 0 

268 self.housekeeping: dict[str, Any] = housekeeping or {} 

269 self.sent: int = 0 

270 self.context = Context( 

271 hass, 

272 deliveries, 

273 links or [], 

274 recipients or [], 

275 mobile_actions, 

276 template_path, 

277 media_path, 

278 archive, 

279 scenarios, 

280 method_configs or {}, 

281 cameras, 

282 METHODS, 

283 ) 

284 self.unsubscribes: list[CALLBACK_TYPE] = [] 

285 self.dupe_check_config: dict[str, Any] = dupe_check or {} 

286 self.last_purge: dt.datetime | None = None 

287 self.notification_cache: TTLCache[tuple[int, str], str] = TTLCache( 

288 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120) 

289 ) 

290 

291 async def initialize(self) -> None: 

292 await self.context.initialize() 

293 

294 self.expose_entities() 

295 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

296 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME) 

297 if housekeeping_schedule: 

298 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule) 

299 self.unsubscribes.append( 

300 async_track_time_change( 

301 self.hass, 

302 self.async_nightly_tasks, 

303 hour=housekeeping_schedule.hour, 

304 minute=housekeeping_schedule.minute, 

305 second=housekeeping_schedule.second, 

306 ) 

307 ) 

308 

309 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown)) 

310 

311 async def async_shutdown(self, event: Event) -> None: 

312 _LOGGER.info("SUPERNOTIFY shutting down, %s", event) 

313 self.shutdown() 

314 

315 async def async_unregister_services(self) -> None: 

316 _LOGGER.info("SUPERNOTIFY unregistering") 

317 self.shutdown() 

318 return await super().async_unregister_services() 

319 

320 def shutdown(self) -> None: 

321 for unsub in self.unsubscribes: 

322 try: 

323 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub) 

324 unsub() 

325 except Exception as e: 

326 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e) 

327 _LOGGER.info("SUPERNOTIFY shut down") 

328 

329 def expose_entities(self) -> None: 

330 for scenario in self.context.scenarios.values(): 

331 self.hass.states.async_set( 

332 f"{DOMAIN}.scenario_{scenario.name}", STATE_UNKNOWN, scenario.attributes(include_condition=False) 

333 ) 

334 for method in self.context.methods.values(): 

335 self.hass.states.async_set( 

336 f"{DOMAIN}.method_{method.method}", 

337 STATE_ON if len(method.valid_deliveries) > 0 else STATE_OFF, 

338 method.attributes(), 

339 ) 

340 for delivery_name, delivery in self.context._deliveries.items(): 

341 self.hass.states.async_set( 

342 f"{DOMAIN}.delivery_{delivery_name}", 

343 STATE_ON if str(delivery_name in self.context.deliveries) else STATE_OFF, 

344 delivery, 

345 ) 

346 

347 def dupe_check(self, notification: Notification) -> bool: 

348 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP) 

349 if policy == ATTR_DUPE_POLICY_NONE: 

350 return False 

351 notification_hash = notification.hash() 

352 if notification.priority in PRIORITY_VALUES: 

353 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :] 

354 else: 

355 same_or_higher_priority = [notification.priority] 

356 dupe = False 

357 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority): 

358 _LOGGER.debug("SUPERNOTIFY Detected dupe notification") 

359 dupe = True 

360 self.notification_cache[notification_hash, notification.priority] = notification.id 

361 return dupe 

362 

363 async def async_send_message( 

364 self, message: str = "", title: str | None = None, target: list[str] | str | None = None, **kwargs: Any 

365 ) -> None: 

366 """Send a message via chosen method.""" 

367 data = kwargs.get(ATTR_DATA, {}) 

368 notification = None 

369 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data) 

370 

371 try: 

372 notification = Notification(self.context, message, title, target, data) 

373 await notification.initialize() 

374 if self.dupe_check(notification): 

375 notification.suppress() 

376 else: 

377 if await notification.deliver(): 

378 self.sent += 1 

379 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent)) 

380 elif notification.errored: 

381 _LOGGER.error("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored) 

382 else: 

383 _LOGGER.warning("SUPERNOTIFY No delivery selected for %s", notification.id) 

384 

385 except Exception as err: 

386 # fault barrier of last resort, integration failures should be caught within envelope delivery 

387 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True) 

388 self.failures += 1 

389 if notification is not None: 

390 notification.delivery_error = format_exception(err) 

391 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures)) 

392 

393 if notification is not None: 

394 self.last_notification = notification 

395 self.context.archive.archive(notification) 

396 if self.context.archive_topic: 

397 await self.context.archive_topic.publish(notification) 

398 

399 _LOGGER.debug( 

400 "SUPERNOTIFY %s deliveries, %s errors, %s skipped", 

401 notification.delivered, 

402 notification.errored, 

403 notification.skipped, 

404 ) 

405 

406 def enquire_deliveries_by_scenario(self) -> dict[str, list[str]]: 

407 return self.context.delivery_by_scenario 

408 

409 async def enquire_occupancy(self) -> dict[str, list[dict[str, Any]]]: 

410 return self.context.determine_occupancy() 

411 

412 async def enquire_active_scenarios(self) -> list[str]: 

413 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy() 

414 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None) 

415 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)] 

416 

417 async def trace_active_scenarios(self) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]: 

418 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy() 

419 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None) 

420 

421 def safe_json(v: Any) -> Any: 

422 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder)) 

423 

424 enabled = [] 

425 disabled = [] 

426 dcvars = asdict(cvars) 

427 for s in self.context.scenarios.values(): 

428 if await s.trace(cvars): 

429 enabled.append(safe_json(s.attributes(include_trace=True))) 

430 else: 

431 disabled.append(safe_json(s.attributes(include_trace=True))) 

432 return enabled, disabled, dcvars 

433 

434 def enquire_scenarios(self) -> dict[str, dict[str, Any]]: 

435 return {s.name: s.attributes(include_condition=False) for s in self.context.scenarios.values()} 

436 

437 def enquire_snoozes(self) -> list[dict[str, Any]]: 

438 return self.context.snoozer.export() 

439 

440 def clear_snoozes(self) -> int: 

441 return self.context.snoozer.clear() 

442 

443 def enquire_people(self) -> list[dict[str, Any]]: 

444 return list(self.context.people.values()) 

445 

446 @callback 

447 def on_mobile_action(self, event: Event) -> None: 

448 """Listen for mobile actions relevant to snooze and silence notifications 

449 

450 Example Action: 

451 event_type: mobile_app_notification_action 

452 data: 

453 foo: a 

454 origin: REMOTE 

455 time_fired: "2024-04-20T13:14:09.360708+00:00" 

456 context: 

457 id: 01HVXT93JGWEDW0KE57Z0X6Z1K 

458 parent_id: null 

459 user_id: e9dbae1a5abf44dbbad52ff85501bb17 

460 """ 

461 event_name = event.data.get(ATTR_ACTION) 

462 if event_name is None or not event_name.startswith("SUPERNOTIFY_"): 

463 return # event not intended for here 

464 self.context.snoozer.handle_command_event(event, self.context.people) 

465 

466 @callback 

467 async def async_nightly_tasks(self, now: dt.datetime) -> None: 

468 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now) 

469 await self.context.archive.cleanup() 

470 self.context.snoozer.purge_snoozes() 

471 _LOGGER.info("SUPERNOTIFY Housekeeping completed")