Coverage for custom_components/supernotify/notify.py: 87%

197 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-28 14:21 +0000

1"""Supernotify service, extending BaseNotificationService""" 

2 

3import datetime as dt 

4import json 

5import logging 

6from dataclasses import asdict 

7from traceback import format_exception 

8from typing import Any 

9 

10from cachetools import TTLCache 

11from homeassistant.components.notify.legacy import BaseNotificationService 

12from homeassistant.const import CONF_CONDITION, EVENT_HOMEASSISTANT_STOP 

13from homeassistant.core import Event, HomeAssistant, ServiceCall, SupportsResponse, callback 

14from homeassistant.helpers.condition import async_validate_condition_config 

15from homeassistant.helpers.event import async_track_time_change 

16from homeassistant.helpers.json import ExtendedJSONEncoder 

17from homeassistant.helpers.reload import async_setup_reload_service 

18from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType 

19 

20from custom_components.supernotify.archive import ARCHIVE_PURGE_MIN_INTERVAL 

21from custom_components.supernotify.scenario import Scenario 

22 

23from . import ( 

24 ATTR_ACTION, 

25 ATTR_DATA, 

26 ATTR_DUPE_POLICY_MTSLP, 

27 ATTR_DUPE_POLICY_NONE, 

28 CONF_ACTION_GROUPS, 

29 CONF_ACTIONS, 

30 CONF_ARCHIVE, 

31 CONF_CAMERAS, 

32 CONF_DELIVERY, 

33 CONF_DUPE_CHECK, 

34 CONF_DUPE_POLICY, 

35 CONF_HOUSEKEEPING, 

36 CONF_HOUSEKEEPING_TIME, 

37 CONF_LINKS, 

38 CONF_MEDIA_PATH, 

39 CONF_METHODS, 

40 CONF_RECIPIENTS, 

41 CONF_SCENARIOS, 

42 CONF_SIZE, 

43 CONF_TEMPLATE_PATH, 

44 CONF_TTL, 

45 DOMAIN, 

46 PLATFORM_SCHEMA, 

47 PLATFORMS, 

48 PRIORITY_MEDIUM, 

49 PRIORITY_VALUES, 

50 ConditionVariables, 

51) 

52from .configuration import SupernotificationConfiguration 

53from .methods.alexa_media_player import AlexaMediaPlayerDeliveryMethod 

54from .methods.chime import ChimeDeliveryMethod 

55from .methods.email import EmailDeliveryMethod 

56from .methods.generic import GenericDeliveryMethod 

57from .methods.media_player_image import MediaPlayerImageDeliveryMethod 

58from .methods.mobile_push import MobilePushDeliveryMethod 

59from .methods.persistent import PersistentDeliveryMethod 

60from .methods.sms import SMSDeliveryMethod 

61from .notification import Notification 

62 

63_LOGGER = logging.getLogger(__name__) 

64 

65SNOOZE_TIME = 60 * 60 # TODO: move to configuration 

66 

67METHODS: list[type] = [ 

68 EmailDeliveryMethod, 

69 SMSDeliveryMethod, 

70 AlexaMediaPlayerDeliveryMethod, 

71 MobilePushDeliveryMethod, 

72 MediaPlayerImageDeliveryMethod, 

73 ChimeDeliveryMethod, 

74 PersistentDeliveryMethod, 

75 GenericDeliveryMethod, 

76] 

77 

78 

79async def async_get_service( 

80 hass: HomeAssistant, 

81 config: ConfigType, 

82 discovery_info: DiscoveryInfoType | None = None, 

83) -> "SuperNotificationAction": 

84 """Notify specific component setup - see async_setup_legacy in BaseNotificationService""" 

85 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection 

86 _ = discovery_info 

87 for delivery in config.get(CONF_DELIVERY, {}).values(): 

88 if delivery and CONF_CONDITION in delivery: 

89 try: 

90 await async_validate_condition_config(hass, delivery[CONF_CONDITION]) 

91 except Exception as e: 

92 _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e) 

93 raise 

94 

95 hass.states.async_set( 

96 f"{DOMAIN}.configured", 

97 "True", 

98 { 

99 CONF_DELIVERY: config.get(CONF_DELIVERY, {}), 

100 CONF_LINKS: config.get(CONF_LINKS, ()), 

101 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH), 

102 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH), 

103 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}), 

104 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()), 

105 CONF_ACTIONS: config.get(CONF_ACTIONS, {}), 

106 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()), 

107 CONF_METHODS: config.get(CONF_METHODS, {}), 

108 CONF_CAMERAS: config.get(CONF_CAMERAS, {}), 

109 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}), 

110 }, 

111 ) 

112 hass.states.async_set(f"{DOMAIN}.failures", "0") 

113 hass.states.async_set(f"{DOMAIN}.sent", "0") 

114 

115 await async_setup_reload_service(hass, DOMAIN, PLATFORMS) 

116 service = SuperNotificationAction( 

117 hass, 

118 deliveries=config[CONF_DELIVERY], 

119 template_path=config[CONF_TEMPLATE_PATH], 

120 media_path=config[CONF_MEDIA_PATH], 

121 archive=config[CONF_ARCHIVE], 

122 housekeeping=config[CONF_HOUSEKEEPING], 

123 recipients=config[CONF_RECIPIENTS], 

124 mobile_actions=config[CONF_ACTION_GROUPS], 

125 scenarios=config[CONF_SCENARIOS], 

126 links=config[CONF_LINKS], 

127 method_defaults=config[CONF_METHODS], 

128 cameras=config[CONF_CAMERAS], 

129 dupe_check=config[CONF_DUPE_CHECK], 

130 ) 

131 await service.initialize() 

132 

133 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict: 

134 return service.enquire_deliveries_by_scenario() 

135 

136 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict: 

137 return service.last_notification.contents() if service.last_notification else {} 

138 

139 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict: 

140 trace = call.data.get("trace", False) 

141 return {"scenarios": await service.enquire_active_scenarios(trace)} 

142 

143 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict: 

144 return {"scenarios": await service.enquire_occupancy()} 

145 

146 async def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict: 

147 return {"snoozes": service.enquire_snoozes()} 

148 

149 async def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict: 

150 return {"cleared": service.clear_snoozes()} 

151 

152 async def supplemental_action_enquire_people(_call: ServiceCall) -> dict: 

153 return {"people": service.enquire_people()} 

154 

155 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]: 

156 days = call.data.get("days") 

157 if service.context.archive is None: 

158 return {"error": "No archive configured"} 

159 purged = await service.context.archive.cleanup(days=days, force=True) 

160 arch_size = await service.context.archive.size() 

161 return { 

162 "purged": purged, 

163 "remaining": arch_size, 

164 "interval": ARCHIVE_PURGE_MIN_INTERVAL, 

165 "days": service.context.archive.archive_days if days is None else days, 

166 } 

167 

168 hass.services.async_register( 

169 DOMAIN, 

170 "enquire_deliveries_by_scenario", 

171 supplemental_action_enquire_deliveries_by_scenario, 

172 supports_response=SupportsResponse.ONLY, 

173 ) 

174 hass.services.async_register( 

175 DOMAIN, 

176 "enquire_last_notification", 

177 supplemental_action_enquire_last_notification, 

178 supports_response=SupportsResponse.ONLY, 

179 ) 

180 hass.services.async_register( 

181 DOMAIN, 

182 "enquire_active_scenarios", 

183 supplemental_action_enquire_active_scenarios, 

184 supports_response=SupportsResponse.ONLY, 

185 ) 

186 hass.services.async_register( 

187 DOMAIN, 

188 "enquire_occupancy", 

189 supplemental_action_enquire_occupancy, 

190 supports_response=SupportsResponse.ONLY, 

191 ) 

192 hass.services.async_register( 

193 DOMAIN, 

194 "enquire_people", 

195 supplemental_action_enquire_people, 

196 supports_response=SupportsResponse.ONLY, 

197 ) 

198 hass.services.async_register( 

199 DOMAIN, 

200 "enquire_snoozes", 

201 supplemental_action_enquire_snoozes, 

202 supports_response=SupportsResponse.ONLY, 

203 ) 

204 hass.services.async_register( 

205 DOMAIN, 

206 "clear_snoozes", 

207 supplemental_action_clear_snoozes, 

208 supports_response=SupportsResponse.ONLY, 

209 ) 

210 hass.services.async_register( 

211 DOMAIN, 

212 "purge_archive", 

213 supplemental_action_purge_archive, 

214 supports_response=SupportsResponse.ONLY, 

215 ) 

216 

217 return service 

218 

219 

220class SuperNotificationAction(BaseNotificationService): 

221 """Implement SuperNotification action.""" 

222 

223 def __init__( 

224 self, 

225 hass: HomeAssistant, 

226 deliveries: dict[str, dict] | None = None, 

227 template_path: str | None = None, 

228 media_path: str | None = None, 

229 archive: dict | None = None, 

230 housekeeping: dict | None = None, 

231 recipients: list | None = None, 

232 mobile_actions: dict | None = None, 

233 scenarios: dict[str, dict] | None = None, 

234 links: list | None = None, 

235 method_defaults: dict | None = None, 

236 cameras: list[dict] | None = None, 

237 dupe_check: dict | None = None, 

238 ) -> None: 

239 """Initialize the service.""" 

240 self.hass: HomeAssistant = hass 

241 self.last_notification: Notification | None = None 

242 self.failures: int = 0 

243 self.housekeeping: dict = housekeeping or {} 

244 self.sent: int = 0 

245 self.context = SupernotificationConfiguration( 

246 hass, 

247 deliveries, 

248 links or [], 

249 recipients or [], 

250 mobile_actions, 

251 template_path, 

252 media_path, 

253 archive, 

254 scenarios, 

255 method_defaults or {}, 

256 cameras, 

257 ) 

258 self.unsubscribes: list = [] 

259 self.dupe_check_config: dict[str, Any] = dupe_check or {} 

260 self.last_purge: dt.datetime | None = None 

261 self.notification_cache: TTLCache = TTLCache( 

262 maxsize=self.dupe_check_config.get(CONF_SIZE, 100), ttl=self.dupe_check_config.get(CONF_TTL, 120) 

263 ) 

264 

265 async def initialize(self) -> None: 

266 await self.context.initialize() 

267 await self.context.register_delivery_methods(delivery_method_classes=METHODS) 

268 

269 self.expose_entities() 

270 self.unsubscribes.append(self.hass.bus.async_listen("mobile_app_notification_action", self.on_mobile_action)) 

271 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME) 

272 if housekeeping_schedule: 

273 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule) 

274 self.unsubscribes.append( 

275 async_track_time_change( 

276 self.hass, 

277 self.async_nightly_tasks, 

278 hour=housekeeping_schedule.hour, 

279 minute=housekeeping_schedule.minute, 

280 second=housekeeping_schedule.second, 

281 ) 

282 ) 

283 

284 self.unsubscribes.append(self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self.async_shutdown)) 

285 

286 async def async_shutdown(self, event: Event) -> None: 

287 _LOGGER.info("SUPERNOTIFY shutting down, %s", event) 

288 self.shutdown() 

289 

290 async def async_unregister_services(self) -> None: 

291 _LOGGER.info("SUPERNOTIFY unregistering") 

292 self.shutdown() 

293 return await super().async_unregister_services() 

294 

295 def shutdown(self) -> None: 

296 for unsub in self.unsubscribes: 

297 if unsub: 

298 try: 

299 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub) 

300 unsub() 

301 except Exception as e: 

302 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e) 

303 _LOGGER.info("SUPERNOTIFY shut down") 

304 

305 def expose_entities(self) -> None: 

306 for scenario in self.context.scenarios.values(): 

307 self.hass.states.async_set(f"{DOMAIN}.scenario_{scenario.name}", "", scenario.attributes(include_condition=False)) 

308 for method in self.context.methods.values(): 

309 self.hass.states.async_set( 

310 f"{DOMAIN}.method_{method.method}", str(len(method.valid_deliveries) > 0), method.attributes() 

311 ) 

312 for delivery_name, delivery in self.context._deliveries.items(): 

313 self.hass.states.async_set( 

314 f"{DOMAIN}.delivery_{delivery_name}", str(delivery_name in self.context.deliveries), delivery 

315 ) 

316 

317 def dupe_check(self, notification: Notification) -> bool: 

318 policy = self.dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP) 

319 if policy == ATTR_DUPE_POLICY_NONE: 

320 return False 

321 notification_hash = notification.hash() 

322 if notification.priority in PRIORITY_VALUES: 

323 same_or_higher_priority = PRIORITY_VALUES[PRIORITY_VALUES.index(notification.priority) :] 

324 else: 

325 same_or_higher_priority = [notification.priority] 

326 dupe = False 

327 if any((notification_hash, p) in self.notification_cache for p in same_or_higher_priority): 

328 _LOGGER.debug("SUPERNOTIFY Detected dupe notification") 

329 dupe = True 

330 self.notification_cache[(notification_hash, notification.priority)] = notification.id 

331 return dupe 

332 

333 async def async_send_message( 

334 self, message: str = "", title: str | None = None, target: list | str | None = None, **kwargs 

335 ) -> None: 

336 """Send a message via chosen method.""" 

337 data = kwargs.get(ATTR_DATA, {}) 

338 notification = None 

339 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data) 

340 

341 try: 

342 notification = Notification(self.context, message, title, target, data) 

343 await notification.initialize() 

344 if self.dupe_check(notification): 

345 notification.suppress() 

346 else: 

347 if await notification.deliver(): 

348 self.sent += 1 

349 self.hass.states.async_set(f"{DOMAIN}.sent", str(self.sent)) 

350 elif notification.errored: 

351 _LOGGER.warning("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.errored) 

352 else: 

353 _LOGGER.info("SUPERNOTIFY No delivery selected for %s", notification.id) 

354 

355 except Exception as err: 

356 # fault barrier of last resort, integration failures should be caught within envelope delivery 

357 _LOGGER.error("SUPERNOTIFY Failed to send message %s: %s", message, err, exc_info=True) 

358 self.failures += 1 

359 if notification is not None: 

360 notification.delivery_error = format_exception(err) 

361 self.hass.states.async_set(f"{DOMAIN}.failures", str(self.failures)) 

362 

363 if notification is not None: 

364 self.last_notification = notification 

365 self.context.archive.archive(notification) 

366 

367 _LOGGER.debug( 

368 "SUPERNOTIFY %s deliveries, %s errors, %s skipped", 

369 notification.delivered, 

370 notification.errored, 

371 notification.skipped, 

372 ) 

373 

374 def enquire_deliveries_by_scenario(self) -> dict[str, list[Scenario]]: 

375 return self.context.delivery_by_scenario 

376 

377 async def enquire_occupancy(self) -> dict[str, list]: 

378 return self.context.determine_occupancy() 

379 

380 async def enquire_active_scenarios(self, trace: bool) -> list[str] | dict: 

381 occupiers = self.context.determine_occupancy() 

382 cvars = ConditionVariables([], [], PRIORITY_MEDIUM, occupiers, None, None) 

383 

384 def safe_json(v: Any) -> Any: 

385 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder)) 

386 

387 if trace: 

388 results = {"ENABLED": [], "DISABLED": [], "VARS": asdict(cvars)} 

389 for s in self.context.scenarios.values(): 

390 if await s.trace(cvars): 

391 results["ENABLED"].append(safe_json(s.attributes(include_trace=True))) # type: ignore 

392 else: 

393 results["DISABLED"].append(safe_json(s.attributes(include_trace=True))) # type: ignore 

394 return results 

395 

396 return [s.name for s in self.context.scenarios.values() if await s.evaluate(cvars)] 

397 

398 def enquire_snoozes(self) -> list[dict[str, Any]]: 

399 return self.context.snoozer.export() 

400 

401 def clear_snoozes(self) -> int: 

402 return self.context.snoozer.clear() 

403 

404 def enquire_people(self) -> list[dict]: 

405 return list(self.context.people.values()) 

406 

407 @callback 

408 def on_mobile_action(self, event: Event) -> None: 

409 """Listen for mobile actions relevant to snooze and silence notifications 

410 

411 Example Action: 

412 event_type: mobile_app_notification_action 

413 data: 

414 foo: a 

415 origin: REMOTE 

416 time_fired: "2024-04-20T13:14:09.360708+00:00" 

417 context: 

418 id: 01HVXT93JGWEDW0KE57Z0X6Z1K 

419 parent_id: null 

420 user_id: e9dbae1a5abf44dbbad52ff85501bb17 

421 """ 

422 event_name = event.data.get(ATTR_ACTION) 

423 if event_name is None or not event_name.startswith("SUPERNOTIFY_"): 

424 return # event not intended for here 

425 self.context.snoozer.handle_command_event(event, self.context.people) 

426 

427 @callback 

428 async def async_nightly_tasks(self, now: dt.datetime) -> None: 

429 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now) 

430 await self.context.archive.cleanup() 

431 self.context.snoozer.purge_snoozes() 

432 _LOGGER.info("SUPERNOTIFY Housekeeping completed")