Coverage for custom_components/supernotify/notify.py: 86%

211 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-18 09:29 +0000

1"""Supernotify service, extending BaseNotificationService""" 

2 

3import datetime as dt 

4import json 

5import logging 

6from dataclasses import asdict 

7from traceback import format_exception 

8from typing import Any 

9 

10from cachetools import TTLCache 

11from homeassistant.components.notify.legacy import BaseNotificationService 

12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN 

13from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, ServiceCall, SupportsResponse, callback 

14from homeassistant.helpers.condition import async_validate_condition_config 

15from homeassistant.helpers.event import async_track_time_change 

16from homeassistant.helpers.json import ExtendedJSONEncoder 

17from homeassistant.helpers.reload import async_setup_reload_service 

18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType 

19 

20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL 

21 

22from . import ( 

23 ATTR_ACTION, 

24 ATTR_DATA, 

25 ATTR_DUPE_POLICY_MTSLP, 

26 ATTR_DUPE_POLICY_NONE, 

27 CONF_ACTION_GROUPS, 

28 CONF_ACTIONS, 

29 CONF_ARCHIVE, 

30 CONF_CAMERAS, 

31 CONF_DELIVERY, 

32 CONF_DUPE_CHECK, 

33 CONF_DUPE_POLICY, 

34 CONF_HOUSEKEEPING, 

35 CONF_HOUSEKEEPING_TIME, 

36 CONF_LINKS, 

37 CONF_MEDIA_PATH, 

38 CONF_METHODS, 

39 CONF_RECIPIENTS, 

40 CONF_SCENARIOS, 

41 CONF_SIZE, 

42 CONF_TEMPLATE_PATH, 

43 CONF_TTL, 

44 DOMAIN, 

45 PLATFORMS, 

46 PRIORITY_MEDIUM, 

47 PRIORITY_VALUES, 

48 ConditionVariables, 

49) 

50from . import SUPERNOTIFY_SCHEMA as PLATFORM_SCHEMA 

51from .configuration import Context 

52from .methods.alexa import AlexaDeliveryMethod 

53from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod 

54from .methods.chime import ChimeDeliveryMethod 

55from .methods.email import EmailDeliveryMethod 

56from .methods.generic import GenericDeliveryMethod 

57from .methods.media_player_image import MediaPlayerImageDeliveryMethod 

58from .methods.mobile_push import MobilePushDeliveryMethod 

59from .methods.persistent import PersistentDeliveryMethod 

60from .methods.sms import SMSDeliveryMethod 

61from .notification import Notification 

62 

63_LOGGER = logging.getLogger(__name__) 

64 

65SNOOZE_TIME = 60 * 60 # TODO: move to configuration 

66 

67METHODS: list[type] = [ 

68 EmailDeliveryMethod, 

69 SMSDeliveryMethod, 

70 AlexaDeliveryMethod, 

71 AlexaMediaPlayerDeliveryMethod, 

72 MobilePushDeliveryMethod, 

73 MediaPlayerImageDeliveryMethod, 

74 ChimeDeliveryMethod, 

75 PersistentDeliveryMethod, 

76 GenericDeliveryMethod, 

77] # No auto-discovery of method plugins so manual class registration required here 

78 

79 

80async def async_get_service( 

81 hass: HomeAssistant, 

82 config: ConfigType, 

83 discovery_info: DiscoveryInfoType | None = None, 

84) -> "SuperNotificationAction": 

85 """Notify specific component setup - see async_setup_legacy in BaseNotificationService""" 

86 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection 

87 _ = discovery_info 

88 for delivery in config.get(CONF_DELIVERY, {}).values(): 

89 if delivery and CONF_CONDITION in delivery: 

90 try: 

91 await async_validate_condition_config(hass, delivery[CONF_CONDITION]) 

92 except Exception as e: 

93 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e) 

94 raise 

95 

96 hass.states.async_set( 

97 f"{DOMAIN}.configured", 

98 "True", 

99 { 

100 CONF_DELIVERY: config.get(CONF_DELIVERY, {}), 

101 CONF_LINKS: config.get(CONF_LINKS, ()), 

102 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH, None), 

103 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH, None), 

104 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}), 

105 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()), 

106 CONF_ACTIONS: config.get(CONF_ACTIONS, {}), 

107 CONF_HOUSEKEEPING: config.get(CONF_HOUSEKEEPING, {}), 

108 CONF_ACTION_GROUPS: config.get(CONF_ACTION_GROUPS, {}), 

109 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()), 

110 CONF_METHODS: config.get(CONF_METHODS, {}), 

111 CONF_CAMERAS: config.get(CONF_CAMERAS, {}), 

112 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}), 

113 }, 

114 ) 

115 hass.states.async_set(f"{DOMAIN}.failures", "0") 

116 hass.states.async_set(f"{DOMAIN}.sent", "0") 

117 

118 await async_setup_reload_service(hass, DOMAIN, PLATFORMS) 

119 service = SuperNotificationAction( 

120 hass, 

121 deliveries=config[CONF_DELIVERY], 

122 template_path=config[CONF_TEMPLATE_PATH], 

123 media_path=config[CONF_MEDIA_PATH], 

124 archive=config[CONF_ARCHIVE], 

125 housekeeping=config[CONF_HOUSEKEEPING], 

126 recipients=config[CONF_RECIPIENTS], 

127 mobile_actions=config[CONF_ACTION_GROUPS], 

128 scenarios=config[CONF_SCENARIOS], 

129 links=config[CONF_LINKS], 

130 method_defaults=config[CONF_METHODS], 

131 cameras=config[CONF_CAMERAS], 

132 dupe_check=config[CONF_DUPE_CHECK], 

133 ) 

134 await service.initialize() 

135 

136 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict[str, Any]: 

137 return service.enquire_deliveries_by_scenario() 

138 

139 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict[str, Any]: 

140 return service.last_notification.contents() if service.last_notification else {} 

141 

142 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict[str, Any]: 

143 trace = call.data.get("trace", False) 

144 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()} 

145 if trace: 

146 result["trace"] = await service.trace_active_scenarios() 

147 return result 

148 

149 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict[str, Any]: 

150 return {"scenarios": service.enquire_scenarios()} 

151 

152 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict[str, Any]: 

153 return {"scenarios": await service.enquire_occupancy()} 

154 

155 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict[str, Any]: 

156 return {"snoozes": service.enquire_snoozes()} 

157 

158 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict[str, Any]: 

159 return {"cleared": service.clear_snoozes()} 

160 

161 def supplemental_action_enquire_people(_call: ServiceCall) -> dict[str, Any]: 

162 return {"people": service.enquire_people()} 

163 

164 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]: 

165 days = call.data.get("days") 

166 if not service.context.archive.enabled: 

167 return {"error": "No archive configured"} 

168 purged = await service.context.archive.cleanup(days=days, force=True) 

169 arch_size = await service.context.archive.size() 

170 return { 

171 "purged": purged, 

172 "remaining": arch_size, 

173 "interval": ARCHIVE_PURGE_MIN_INTERVAL, 

174 "days": service.context.archive.archive_days if days is None else days, 

175 } 

176 

177 hass.services.async_register( 

178 DOMAIN, 

179 "enquire_deliveries_by_scenario", 

180 supplemental_action_enquire_deliveries_by_scenario, 

181 supports_response=SupportsResponse.ONLY, 

182 ) 

183 hass.services.async_register( 

184 DOMAIN, 

185 "enquire_last_notification", 

186 supplemental_action_enquire_last_notification, 

187 supports_response=SupportsResponse.ONLY, 

188 ) 

189 hass.services.async_register( 

190 DOMAIN, 

191 "enquire_active_scenarios", 

192 supplemental_action_enquire_active_scenarios, 

193 supports_response=SupportsResponse.ONLY, 

194 ) 

195 hass.services.async_register( 

196 DOMAIN, 

197 "enquire_scenarios", 

198 supplemental_action_enquire_scenarios, 

199 supports_response=SupportsResponse.ONLY, 

200 ) 

201 hass.services.async_register( 

202 DOMAIN, 

203 "enquire_occupancy", 

204 supplemental_action_enquire_occupancy, 

205 supports_response=SupportsResponse.ONLY, 

206 ) 

207 hass.services.async_register( 

208 DOMAIN, 

209 "enquire_people", 

210 supplemental_action_enquire_people, 

211 supports_response=SupportsResponse.ONLY, 

212 ) 

213 hass.services.async_register( 

214 DOMAIN, 

215 "enquire_snoozes", 

216 supplemental_action_enquire_snoozes, 

217 supports_response=SupportsResponse.ONLY, 

218 ) 

219 hass.services.async_register( 

220 DOMAIN, 

221 "clear_snoozes", 

222 supplemental_action_clear_snoozes, 

223 supports_response=SupportsResponse.ONLY, 

224 ) 

225 hass.services.async_register( 

226 DOMAIN, 

227 "purge_archive", 

228 supplemental_action_purge_archive, 

229 supports_response=SupportsResponse.ONLY, 

230 ) 

231 

232 return service 

233 

234 

235class SuperNotificationAction(BaseNotificationService): 

236 """Implement SuperNotification action.""" 

237 

238 def __init__( 

239 self, 

240 hass: HomeAssistant, 

241 deliveries: dict[str, dict[str, Any]] | None = None, 

242 template_path: str | None = None, 

243 media_path: str | None = None, 

244 archive: dict[str, Any] | None = None, 

245 housekeeping: dict[str, Any] | None = None, 

246 recipients: list[dict[str, Any]] | None = None, 

247 mobile_actions: dict[str, Any] | None = None, 

248 scenarios: dict[str, dict[str, Any]] | None = None, 

249 links: list[str] | None = None, 

250 method_defaults: dict[str, Any] | None = None, 

251 cameras: list[dict[str, Any]] | None = None, 

252 dupe_check: dict[str, Any] | None = None, 

253 ) -> None: 

254 """Initialize the service.""" 

255 self.hass: HomeAssistant = hass 

256 self.last_notification: Notification | None = None 

257 self.failures: int = 0 

258 self.housekeeping: dict[str, Any] = housekeeping or {} 

259 self.sent: int = 0 

260 self.context = Context( 

261 hass, 

262 deliveries, 

263 links or [], 

264 recipients or [], 

265 mobile_actions, 

266 template_path, 

267 media_path, 

268 archive, 

269 scenarios, 

270 method_defaults or {}, 

271 cameras, 

272 ) 

273 self.unsubscribes: list[CALLBACK_TYPE] = [] 

274 self.dupe_check_config: dict[str, Any] = dupe_check or {} 

275 self.last_purge: dt.datetime | None = None 

276 self.notification_cache: TTLCache[tuple[int, str], str] = TTLCache( 

277 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120) 

278 ) 

279 

280 async def initialize(self) -> None: 

281 await self.context.initialize() 

282 await self.context.register_delivery_methods(delivery_method_classes=METHODS) 

283 

284 self.expose_entities() 

285 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

286 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME) 

287 if housekeeping_schedule: 

288 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule) 

289 self.unsubscribes.append( 

290 async_track_time_change( 

291 self.hass, 

292 self.async_nightly_tasks, 

293 hour=housekeeping_schedule.hour, 

294 minute=housekeeping_schedule.minute, 

295 second=housekeeping_schedule.second, 

296 ) 

297 ) 

298 

299 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown)) 

300 

301 async def async_shutdown(self, event: Event) -> None: 

302 _LOGGER.info("SUPERNOTIFY shutting down, %s", event) 

303 self.shutdown() 

304 

305 async def async_unregister_services(self) -> None: 

306 _LOGGER.info("SUPERNOTIFY unregistering") 

307 self.shutdown() 

308 return await super().async_unregister_services() 

309 

310 def shutdown(self) -> None: 

311 for unsub in self.unsubscribes: 

312 try: 

313 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub) 

314 unsub() 

315 except Exception as e: 

316 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e) 

317 _LOGGER.info("SUPERNOTIFY shut down") 

318 

319 def expose_entities(self) -> None: 

320 for scenario in self.context.scenarios.values(): 

321 self.hass.states.async_set( 

322 f"{DOMAIN}.scenario_{scenario.name}", STATE_UNKNOWN, scenario.attributes(include_condition=False) 

323 ) 

324 for method in self.context.methods.values(): 

325 self.hass.states.async_set( 

326 f"{DOMAIN}.method_{method.method}", 

327 STATE_ON if len(method.valid_deliveries) > 0 else STATE_OFF, 

328 method.attributes(), 

329 ) 

330 for delivery_name, delivery in self.context._deliveries.items(): 

331 self.hass.states.async_set( 

332 f"{DOMAIN}.delivery_{delivery_name}", 

333 STATE_ON if str(delivery_name in self.context.deliveries) else STATE_OFF, 

334 delivery, 

335 ) 

336 

337 def dupe_check(self, notification: Notification) -> bool: 

338 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP) 

339 if policy == ATTR_DUPE_POLICY_NONE: 

340 return False 

341 notification_hash = notification.hash() 

342 if notification.priority in PRIORITY_VALUES: 

343 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :] 

344 else: 

345 same_or_higher_priority = [notification.priority] 

346 dupe = False 

347 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority): 

348 _LOGGER.debug("SUPERNOTIFY Detected dupe notification") 

349 dupe = True 

350 self.notification_cache[notification_hash, notification.priority] = notification.id 

351 return dupe 

352 

353 async def async_send_message( 

354 self, message: str = "", title: str | None = None, target: list[str] | str | None = None, **kwargs: Any 

355 ) -> None: 

356 """Send a message via chosen method.""" 

357 data = kwargs.get(ATTR_DATA, {}) 

358 notification = None 

359 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data) 

360 

361 try: 

362 notification = Notification(self.context, message, title, target, data) 

363 await notification.initialize() 

364 if self.dupe_check(notification): 

365 notification.suppress() 

366 else: 

367 if await notification.deliver(): 

368 self.sent += 1 

369 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent)) 

370 elif notification.errored: 

371 _LOGGER.warning("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored) 

372 else: 

373 _LOGGER.info("SUPERNOTIFY No delivery selected for %s", notification.id) 

374 

375 except Exception as err: 

376 # fault barrier of last resort, integration failures should be caught within envelope delivery 

377 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True) 

378 self.failures += 1 

379 if notification is not None: 

380 notification.delivery_error = format_exception(err) 

381 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures)) 

382 

383 if notification is not None: 

384 self.last_notification = notification 

385 self.context.archive.archive(notification) 

386 if self.context.archive_topic: 

387 await self.context.archive_topic.publish(notification) 

388 

389 _LOGGER.debug( 

390 "SUPERNOTIFY %s deliveries, %s errors, %s skipped", 

391 notification.delivered, 

392 notification.errored, 

393 notification.skipped, 

394 ) 

395 

396 def enquire_deliveries_by_scenario(self) -> dict[str, list[str]]: 

397 return self.context.delivery_by_scenario 

398 

399 async def enquire_occupancy(self) -> dict[str, list[dict[str, Any]]]: 

400 return self.context.determine_occupancy() 

401 

402 async def enquire_active_scenarios(self) -> list[str]: 

403 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy() 

404 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None) 

405 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)] 

406 

407 async def trace_active_scenarios(self) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]: 

408 occupiers: dict[str, list[dict[str, Any]]] = self.context.determine_occupancy() 

409 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None) 

410 

411 def safe_json(v: Any) -> Any: 

412 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder)) 

413 

414 enabled = [] 

415 disabled = [] 

416 dcvars = asdict(cvars) 

417 for s in self.context.scenarios.values(): 

418 if await s.trace(cvars): 

419 enabled.append(safe_json(s.attributes(include_trace=True))) 

420 else: 

421 disabled.append(safe_json(s.attributes(include_trace=True))) 

422 return enabled, disabled, dcvars 

423 

424 def enquire_scenarios(self) -> dict[str, dict[str, Any]]: 

425 return {s.name: s.attributes(include_condition=False) for s in self.context.scenarios.values()} 

426 

427 def enquire_snoozes(self) -> list[dict[str, Any]]: 

428 return self.context.snoozer.export() 

429 

430 def clear_snoozes(self) -> int: 

431 return self.context.snoozer.clear() 

432 

433 def enquire_people(self) -> list[dict[str, Any]]: 

434 return list(self.context.people.values()) 

435 

436 @callback 

437 def on_mobile_action(self, event: Event) -> None: 

438 """Listen for mobile actions relevant to snooze and silence notifications 

439 

440 Example Action: 

441 event_type: mobile_app_notification_action 

442 data: 

443 foo: a 

444 origin: REMOTE 

445 time_fired: "2024-04-20T13:14:09.360708+00:00" 

446 context: 

447 id: 01HVXT93JGWEDW0KE57Z0X6Z1K 

448 parent_id: null 

449 user_id: e9dbae1a5abf44dbbad52ff85501bb17 

450 """ 

451 event_name = event.data.get(ATTR_ACTION) 

452 if event_name is None or not event_name.startswith("SUPERNOTIFY_"): 

453 return # event not intended for here 

454 self.context.snoozer.handle_command_event(event, self.context.people) 

455 

456 @callback 

457 async def async_nightly_tasks(self, now: dt.datetime) -> None: 

458 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now) 

459 await self.context.archive.cleanup() 

460 self.context.snoozer.purge_snoozes() 

461 _LOGGER.info("SUPERNOTIFY Housekeeping completed")