Coverage for custom_components/supernotify/configuration.py: 93%

260 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-26 08:54 +0000

1from __future__ import annotations 

2 

3import logging 

4import socket 

5from pathlib import Path 

6from typing import TYPE_CHECKING, Any 

7 

8from homeassistant.const import ( 

9 ATTR_STATE, 

10 CONF_DEFAULT, 

11 CONF_DEVICE_ID, 

12 CONF_ENABLED, 

13 CONF_METHOD, 

14 CONF_NAME, 

15 STATE_HOME, 

16 STATE_NOT_HOME, 

17) 

18from homeassistant.helpers import device_registry, entity_registry 

19from homeassistant.helpers import issue_registry as ir 

20from homeassistant.helpers.config_validation import boolean 

21from homeassistant.helpers.network import get_url 

22from homeassistant.util import slugify 

23 

24from custom_components.supernotify.archive import ArchiveTopic, NotificationArchive 

25from custom_components.supernotify.common import ensure_list, safe_get 

26from custom_components.supernotify.snoozer import Snoozer 

27 

28from . import ( 

29 ATTR_USER_ID, 

30 CONF_ARCHIVE_DAYS, 

31 CONF_ARCHIVE_MQTT_QOS, 

32 CONF_ARCHIVE_MQTT_RETAIN, 

33 CONF_ARCHIVE_MQTT_TOPIC, 

34 CONF_ARCHIVE_PATH, 

35 CONF_CAMERA, 

36 CONF_DATA, 

37 CONF_DEVICE_DISCOVERY, 

38 CONF_DEVICE_DOMAIN, 

39 CONF_DEVICE_NAME, 

40 CONF_DEVICE_TRACKER, 

41 CONF_MANUFACTURER, 

42 CONF_MOBILE_DEVICES, 

43 CONF_MOBILE_DISCOVERY, 

44 CONF_MODEL, 

45 CONF_NOTIFY_ACTION, 

46 CONF_PERSON, 

47 CONF_SELECTION, 

48 CONF_TARGETS_REQUIRED, 

49 DELIVERY_SELECTION_IMPLICIT, 

50 DOMAIN, 

51 SCENARIO_DEFAULT, 

52 SCENARIO_TEMPLATE_ATTRS, 

53 SELECTION_DEFAULT, 

54 SELECTION_FALLBACK, 

55 SELECTION_FALLBACK_ON_ERROR, 

56) 

57from .scenario import Scenario 

58 

59if TYPE_CHECKING: 

60 from homeassistant.core import HomeAssistant, State 

61 from homeassistant.helpers.device_registry import DeviceEntry, DeviceRegistry 

62 

63 from custom_components.supernotify.delivery_method import DeliveryMethod 

64 

65_LOGGER = logging.getLogger(__name__) 

66 

67 

68class Context: 

69 def __init__( 

70 self, 

71 hass: HomeAssistant | None = None, 

72 deliveries: dict[str, Any] | None = None, 

73 links: list[str] | None = None, 

74 recipients: list[dict[str, Any]] | None = None, 

75 mobile_actions: dict[str, Any] | None = None, 

76 template_path: str | None = None, 

77 media_path: str | None = None, 

78 archive_config: dict[str, str] | None = None, 

79 scenarios: dict[str, dict[str, Any]] | None = None, 

80 method_configs: dict[str, Any] | None = None, 

81 cameras: list[dict[str, Any]] | None = None, 

82 method_types: list[type[DeliveryMethod]] | None = None, 

83 ) -> None: 

84 self.hass: HomeAssistant | None = None 

85 self.hass_internal_url: str 

86 self.hass_external_url: str 

87 if hass: 

88 self.hass = hass 

89 self.hass_name = hass.config.location_name 

90 try: 

91 self.hass_internal_url = get_url(hass, prefer_external=False) 

92 except Exception as e: 

93 self.hass_internal_url = f"http://{socket.gethostname()}" 

94 _LOGGER.warning("SUPERNOTIFY could not get internal hass url, defaulting to %s: %s", self.hass_internal_url, e) 

95 try: 

96 self.hass_external_url = get_url(hass, prefer_external=True) 

97 except Exception as e: 

98 _LOGGER.warning("SUPERNOTIFY could not get external hass url, defaulting to internal url: %s", e) 

99 self.hass_external_url = self.hass_internal_url 

100 else: 

101 self.hass_internal_url = "" 

102 self.hass_external_url = "" 

103 self.hass_name = "!UNDEFINED!" 

104 _LOGGER.warning("SUPERNOTIFY Configured without HomeAssistant instance") 

105 

106 _LOGGER.debug( 

107 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s", 

108 self.hass_name, 

109 self.hass_internal_url, 

110 self.hass_external_url, 

111 ) 

112 

113 if not self.hass_internal_url or not self.hass_internal_url.startswith("http"): 

114 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.hass_internal_url) 

115 

116 self.links: list[dict[str, Any]] = ensure_list(links) 

117 # raw configured deliveries 

118 self._deliveries: dict[str, Any] = deliveries if isinstance(deliveries, dict) else {} 

119 # validated deliveries 

120 self.deliveries: dict[str, Any] = {} 

121 self._recipients: list[dict[str, Any]] = ensure_list(recipients) 

122 self.mobile_actions: dict[str, Any] = mobile_actions or {} 

123 self.template_path: Path | None = Path(template_path) if template_path else None 

124 self.media_path: Path | None = Path(media_path) if media_path else None 

125 archive_config = archive_config or {} 

126 self.archive: NotificationArchive = NotificationArchive( 

127 bool(archive_config.get(CONF_ENABLED, False)), 

128 archive_config.get(CONF_ARCHIVE_PATH), 

129 archive_config.get(CONF_ARCHIVE_DAYS), 

130 ) 

131 archive_topic = archive_config.get(CONF_ARCHIVE_MQTT_TOPIC) 

132 self.archive_topic: ArchiveTopic | None = None 

133 if archive_topic is not None and self.hass: 

134 self.archive_topic = ArchiveTopic( 

135 self.hass, 

136 archive_topic, 

137 int(archive_config.get(CONF_ARCHIVE_MQTT_QOS, 0)), 

138 boolean(archive_config.get(CONF_ARCHIVE_MQTT_RETAIN, True)), 

139 ) 

140 else: 

141 self.archive_topic = None 

142 self.cameras: dict[str, Any] = {c[CONF_CAMERA]: c for c in cameras} if cameras else {} 

143 self.methods: dict[str, DeliveryMethod] = {} 

144 self._method_configs: dict[str, Any] = method_configs or {} 

145 self.scenarios: dict[str, Scenario] = {} 

146 self.people: dict[str, dict[str, Any]] = {} 

147 self._config_scenarios: dict[str, Any] = scenarios or {} 

148 self.content_scenario_templates: dict[str, Any] = {} 

149 self.delivery_by_scenario: dict[str, list[str]] = {SCENARIO_DEFAULT: []} 

150 self.fallback_on_error: dict[str, dict[str, Any]] = {} 

151 self.fallback_by_default: dict[str, dict[str, Any]] = {} 

152 self._entity_registry: entity_registry.EntityRegistry | None = None 

153 self._device_registry: device_registry.DeviceRegistry | None = None 

154 self._method_types: list[type[DeliveryMethod]] = method_types or [] 

155 self.snoozer = Snoozer() 

156 # test harness support 

157 self._create_default_scenario: bool = False 

158 self._method_instances: list[DeliveryMethod] | None = None 

159 

160 async def initialize(self) -> None: 

161 await self._register_delivery_methods( 

162 delivery_methods=self._method_instances, delivery_method_classes=self._method_types 

163 ) 

164 

165 self.people = self.setup_people(self._recipients) 

166 

167 if self._config_scenarios and self.hass: 

168 for scenario_name, scenario_definition in self._config_scenarios.items(): 

169 scenario = Scenario(scenario_name, scenario_definition, self.hass) 

170 if await scenario.validate( 

171 valid_deliveries=list(self.deliveries), valid_action_groups=list(self.mobile_actions) 

172 ): 

173 self.scenarios[scenario_name] = scenario 

174 

175 if self.template_path and not self.template_path.exists(): 

176 _LOGGER.warning("SUPERNOTIFY template path not found at %s", self.template_path) 

177 self.template_path = None 

178 

179 if self.media_path and not self.media_path.exists(): 

180 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path) 

181 try: 

182 self.media_path.mkdir(parents=True, exist_ok=True) 

183 except Exception as e: 

184 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e) 

185 self.raise_issue("media_path", "media_path", {"path": str(self.media_path), "error": str(e)}) 

186 self.media_path = None 

187 if self.media_path is not None: 

188 _LOGGER.info("SUPERNOTIFY abs media path: %s", self.media_path.absolute()) 

189 if self.archive: 

190 self.archive.initialize() 

191 default_deliveries: dict[str, Any] = self.initialize_deliveries() 

192 self.initialize_scenarios(default_deliveries, default_scenario=self._create_default_scenario) 

193 

194 def configure_for_tests( 

195 self, method_instances: list[DeliveryMethod] | None = None, create_default_scenario: bool = False 

196 ) -> None: 

197 self._create_default_scenario = create_default_scenario 

198 self._method_instances = method_instances 

199 

200 def raise_issue( 

201 self, 

202 issue_id: str, 

203 issue_key: str, 

204 issue_map: dict[str, str], 

205 severity: ir.IssueSeverity = ir.IssueSeverity.WARNING, 

206 learn_more_url: str = "https://jeyrb.github.io/hass_supernotify", 

207 ) -> None: 

208 if not self.hass: 

209 return 

210 ir.async_create_issue( 

211 self.hass, 

212 DOMAIN, 

213 issue_id, 

214 is_fixable=False, 

215 translation_key=issue_key, 

216 translation_placeholders=issue_map, 

217 severity=severity, 

218 learn_more_url=learn_more_url, 

219 ) 

220 

221 def initialize_deliveries(self) -> dict[str, Any]: 

222 default_deliveries = {} 

223 if self._deliveries: 

224 for d, dc in self._deliveries.items(): 

225 method = self.methods.get(dc[CONF_METHOD]) 

226 if method: 

227 for k, v in method.default.items(): 

228 dc.setdefault(k, v) 

229 else: 

230 _LOGGER.warning(f"SUPERNOTIFY Unknown method {dc[CONF_METHOD]} for delivery {d}") 

231 self.raise_issue( 

232 f"delivery_{d}_unknown_method{dc[CONF_METHOD]}", 

233 issue_key="delivery_unknown_method", 

234 issue_map={"delivery": d, "method": dc[CONF_METHOD]}, 

235 ) 

236 dc[CONF_ENABLED] = False 

237 if dc.get(CONF_ENABLED, True): 

238 if SELECTION_FALLBACK_ON_ERROR in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

239 self.fallback_on_error[d] = dc 

240 if SELECTION_FALLBACK in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

241 self.fallback_by_default[d] = dc 

242 if SELECTION_DEFAULT in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

243 default_deliveries[d] = dc 

244 

245 if not dc.get(CONF_NAME): 

246 dc[CONF_NAME] = d # for minimal tests 

247 

248 return default_deliveries 

249 

250 def initialize_scenarios(self, default_deliveries: dict[str, Any], default_scenario: bool = False) -> None: 

251 for scenario_name, scenario in self.scenarios.items(): 

252 self.delivery_by_scenario.setdefault(scenario_name, []) 

253 if scenario.delivery_selection == DELIVERY_SELECTION_IMPLICIT: 

254 scenario_deliveries: list[str] = list(default_deliveries.keys()) 

255 else: 

256 scenario_deliveries = [] 

257 scenario_definition_delivery = scenario.delivery 

258 scenario_deliveries.extend(s for s in scenario_definition_delivery if s not in scenario_deliveries) 

259 

260 for scenario_delivery in scenario_deliveries: 

261 if safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_ENABLED, True): 

262 self.delivery_by_scenario[scenario_name].append(scenario_delivery) 

263 

264 scenario_delivery_config = safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_DATA, {}) 

265 

266 # extract message and title templates per scenario per delivery 

267 for template_field in SCENARIO_TEMPLATE_ATTRS: 

268 template_format = scenario_delivery_config.get(template_field) 

269 if template_format is not None: 

270 self.content_scenario_templates.setdefault(template_field, {}) 

271 self.content_scenario_templates[template_field].setdefault(scenario_delivery, []) 

272 self.content_scenario_templates[template_field][scenario_delivery].append(scenario_name) 

273 

274 self.delivery_by_scenario[SCENARIO_DEFAULT] = list(default_deliveries.keys()) 

275 if default_scenario: 

276 for d, dc in self.deliveries.items(): 

277 if dc.get(CONF_ENABLED, True) and d not in self.delivery_by_scenario[SCENARIO_DEFAULT]: 

278 self.delivery_by_scenario[SCENARIO_DEFAULT].append(d) 

279 

280 async def _register_delivery_methods( 

281 self, 

282 delivery_methods: list[DeliveryMethod] | None = None, 

283 delivery_method_classes: list[type[DeliveryMethod]] | None = None, 

284 ) -> None: 

285 """Use configure_for_tests() to set delivery_methods to mocks or manually created fixtures""" 

286 if delivery_methods: 

287 for delivery_method in delivery_methods: 

288 self.methods[delivery_method.method] = delivery_method 

289 await self.methods[delivery_method.method].initialize() 

290 self.deliveries.update(self.methods[delivery_method.method].valid_deliveries) 

291 if delivery_method_classes and self.hass: 

292 for delivery_method_class in delivery_method_classes: 

293 method_config = self._method_configs.get(delivery_method_class.method, {}) 

294 self.methods[delivery_method_class.method] = delivery_method_class( 

295 self.hass, 

296 self, 

297 self._deliveries, 

298 default=method_config.get(CONF_DEFAULT, {}), 

299 device_domain=method_config.get(CONF_DEVICE_DOMAIN, []), 

300 device_discovery=method_config.get(CONF_DEVICE_DISCOVERY, False), 

301 targets_required=method_config.get(CONF_TARGETS_REQUIRED, False), 

302 ) 

303 await self.methods[delivery_method_class.method].initialize() 

304 self.deliveries.update(self.methods[delivery_method_class.method].valid_deliveries) 

305 

306 _LOGGER.info("SUPERNOTIFY configured deliveries %s", "; ".join(self.deliveries.keys())) 

307 

308 def delivery_method(self, delivery: str) -> DeliveryMethod: 

309 method_name = self.deliveries.get(delivery, {}).get(CONF_METHOD) 

310 method: DeliveryMethod | None = self.methods.get(method_name) 

311 if not method: 

312 raise ValueError(f"SUPERNOTIFY No method {method_name} for delivery {delivery}") 

313 return method 

314 

315 def discover_devices(self, discover_domain: str) -> list[DeviceEntry]: 

316 devices: list[DeviceEntry] = [] 

317 dev_reg: DeviceRegistry | None = self.device_registry() 

318 if dev_reg is None: 

319 _LOGGER.warning(f"SUPERNOTIFY Unable to discover devices for {discover_domain} - no device registry found") 

320 return [] 

321 

322 all_devs = enabled_devs = found_devs = 0 

323 for dev in dev_reg.devices.values(): 

324 all_devs += 1 

325 if not dev.disabled: 

326 enabled_devs += 1 

327 for identifier in dev.identifiers: 

328 if identifier and len(identifier) > 1 and identifier[0] == discover_domain: 

329 _LOGGER.debug("SUPERNOTIFY discovered device %s for identifier %s", dev.name, identifier) 

330 devices.append(dev) 

331 found_devs += 1 

332 elif identifier and len(identifier) != 2: 

333 # HomeKit has triples for identifiers, other domains may behave similarly 

334 _LOGGER.debug("SUPERNOTIFY Unexpected device %s identifier: %s", dev.name, identifier) # type: ignore 

335 _LOGGER.info( 

336 f"SUPERNOTIFY {discover_domain} device discovery, all={all_devs}, enabled={enabled_devs}, found={found_devs}" 

337 ) 

338 return devices 

339 

340 def setup_people(self, recipients: list[dict[str, Any]] | tuple[dict[str, Any]]) -> dict[str, dict[str, Any]]: 

341 people: dict[str, dict[str, Any]] = {} 

342 for r in recipients: 

343 if r.get(CONF_MOBILE_DISCOVERY): 

344 r[CONF_MOBILE_DEVICES].extend(self.mobile_devices_for_person(r[CONF_PERSON])) 

345 if r.get(CONF_MOBILE_DEVICES): 

346 _LOGGER.info("SUPERNOTIFY Auto configured %s for mobile devices %s", r[CONF_PERSON], r[CONF_MOBILE_DEVICES]) 

347 else: 

348 _LOGGER.warning("SUPERNOTIFY Unable to find mobile devices for %s", r[CONF_PERSON]) 

349 if self.hass: 

350 state: State | None = self.hass.states.get(r[CONF_PERSON]) 

351 if state is not None: 

352 r[ATTR_USER_ID] = state.attributes.get(ATTR_USER_ID) 

353 people[r[CONF_PERSON]] = r 

354 return people 

355 

356 def people_state(self) -> list[dict[str, Any]]: 

357 results = [] 

358 if self.hass: 

359 for person, person_config in self.people.items(): 

360 # TODO: possibly rate limit this 

361 try: 

362 tracker = self.hass.states.get(person) 

363 if tracker is None: 

364 person_config[ATTR_STATE] = None 

365 else: 

366 person_config[ATTR_STATE] = tracker.state 

367 except Exception as e: 

368 _LOGGER.warning("SUPERNOTIFY Unable to determine occupied status for %s: %s", person, e) 

369 results.append(person_config) 

370 return results 

371 

372 def determine_occupancy(self) -> dict[str, list[dict[str, Any]]]: 

373 results: dict[str, list[dict[str, Any]]] = {STATE_HOME: [], STATE_NOT_HOME: []} 

374 for person_config in self.people_state(): 

375 if person_config.get(ATTR_STATE) in (None, STATE_HOME): 

376 # default to at home if unknown tracker 

377 results[STATE_HOME].append(person_config) 

378 else: 

379 results[STATE_NOT_HOME].append(person_config) 

380 return results 

381 

382 def entity_registry(self) -> entity_registry.EntityRegistry | None: 

383 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing 

384 to all entities, so do it once here 

385 """ # noqa: D205 

386 if self._entity_registry is not None: 

387 return self._entity_registry 

388 if self.hass: 

389 try: 

390 self._entity_registry = entity_registry.async_get(self.hass) 

391 except Exception as e: 

392 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e) 

393 return self._entity_registry 

394 

395 def device_registry(self) -> device_registry.DeviceRegistry | None: 

396 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing 

397 to all devices, so do it once here 

398 """ # noqa: D205 

399 if self._device_registry is not None: 

400 return self._device_registry 

401 if self.hass: 

402 try: 

403 self._device_registry = device_registry.async_get(self.hass) 

404 except Exception as e: 

405 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e) 

406 return self._device_registry 

407 

408 def mobile_devices_for_person(self, person_entity_id: str, validate_targets: bool = False) -> list[dict[str, Any]]: 

409 """Auto detect mobile_app targets for a person. 

410 

411 Targets not currently validated as async registration may not be complete at this stage 

412 

413 Args: 

414 ---- 

415 person_entity_id (str): _description_ 

416 validate_targets (bool, optional): _description_. Defaults to False. 

417 

418 Returns: 

419 ------- 

420 list: mobile target actions for this person 

421 

422 """ 

423 mobile_devices = [] 

424 person_state = self.hass.states.get(person_entity_id) if self.hass else None 

425 if not person_state: 

426 _LOGGER.warning("SUPERNOTIFY Unable to resolve %s", person_entity_id) 

427 else: 

428 ent_reg = self.entity_registry() 

429 dev_reg = self.device_registry() 

430 if not ent_reg or not dev_reg: 

431 _LOGGER.warning("SUPERNOTIFY Unable to access entity or device registries for %s", person_entity_id) 

432 else: 

433 for d_t in person_state.attributes.get("device_trackers", ()): 

434 entity = ent_reg.async_get(d_t) 

435 if entity and entity.platform == "mobile_app" and entity.device_id: 

436 device = dev_reg.async_get(entity.device_id) 

437 if not device: 

438 _LOGGER.warning("SUPERNOTIFY Unable to find device %s", entity.device_id) 

439 else: 

440 notify_action = f"mobile_app_{slugify(device.name)}" 

441 if ( 

442 validate_targets 

443 and self.hass 

444 and self.hass.services 

445 and not self.hass.services.has_service("notify", notify_action) 

446 ): 

447 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", notify_action) 

448 else: 

449 mobile_devices.append({ 

450 CONF_MANUFACTURER: device.manufacturer, 

451 CONF_MODEL: device.model, 

452 CONF_NOTIFY_ACTION: notify_action, 

453 CONF_DEVICE_TRACKER: d_t, 

454 CONF_DEVICE_ID: device.id, 

455 CONF_DEVICE_NAME: device.name, 

456 # CONF_DEVICE_LABELS: device.labels, 

457 }) 

458 else: 

459 _LOGGER.debug("SUPERNOTIFY Ignoring device tracker %s", d_t) 

460 

461 return mobile_devices