Coverage for custom_components/supernotify/configuration.py: 94%

228 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-18 09:29 +0000

1from __future__ import annotations 

2 

3import logging 

4import socket 

5from pathlib import Path 

6from typing import TYPE_CHECKING, Any 

7 

8from homeassistant.const import ( 

9 ATTR_STATE, 

10 CONF_DEVICE_ID, 

11 CONF_ENABLED, 

12 CONF_METHOD, 

13 CONF_NAME, 

14 STATE_HOME, 

15 STATE_NOT_HOME, 

16) 

17from homeassistant.helpers import device_registry, entity_registry 

18from homeassistant.helpers.config_validation import boolean 

19from homeassistant.helpers.network import get_url 

20from homeassistant.util import slugify 

21 

22from custom_components.supernotify.archive import ArchiveTopic, NotificationArchive 

23from custom_components.supernotify.common import ensure_list, safe_get 

24from custom_components.supernotify.snoozer import Snoozer 

25 

26from . import ( 

27 ATTR_USER_ID, 

28 CONF_ARCHIVE_DAYS, 

29 CONF_ARCHIVE_MQTT_QOS, 

30 CONF_ARCHIVE_MQTT_RETAIN, 

31 CONF_ARCHIVE_MQTT_TOPIC, 

32 CONF_ARCHIVE_PATH, 

33 CONF_CAMERA, 

34 CONF_DATA, 

35 CONF_DEVICE_NAME, 

36 CONF_DEVICE_TRACKER, 

37 CONF_MANUFACTURER, 

38 CONF_MOBILE_DEVICES, 

39 CONF_MOBILE_DISCOVERY, 

40 CONF_MODEL, 

41 CONF_NOTIFY_ACTION, 

42 CONF_PERSON, 

43 CONF_SELECTION, 

44 DELIVERY_SELECTION_IMPLICIT, 

45 METHOD_DEFAULTS_SCHEMA, 

46 SCENARIO_DEFAULT, 

47 SCENARIO_TEMPLATE_ATTRS, 

48 SELECTION_DEFAULT, 

49 SELECTION_FALLBACK, 

50 SELECTION_FALLBACK_ON_ERROR, 

51) 

52from .scenario import Scenario 

53 

54if TYPE_CHECKING: 

55 from homeassistant.core import HomeAssistant, State 

56 

57 from custom_components.supernotify.delivery_method import DeliveryMethod 

58 

59_LOGGER = logging.getLogger(__name__) 

60 

61 

62class Context: 

63 def __init__( 

64 self, 

65 hass: HomeAssistant | None = None, 

66 deliveries: dict[str, Any] | None = None, 

67 links: list[str] | None = None, 

68 recipients: list[dict[str, Any]] | None = None, 

69 mobile_actions: dict[str, Any] | None = None, 

70 template_path: str | None = None, 

71 media_path: str | None = None, 

72 archive_config: dict[str, str] | None = None, 

73 scenarios: dict[str, dict[str, Any]] | None = None, 

74 method_defaults: dict[str, Any] | None = None, 

75 cameras: list[dict[str, Any]] | None = None, 

76 ) -> None: 

77 self.hass: HomeAssistant | None = None 

78 self.hass_internal_url: str 

79 self.hass_external_url: str 

80 if hass: 

81 self.hass = hass 

82 self.hass_name = hass.config.location_name 

83 try: 

84 self.hass_internal_url = get_url(hass, prefer_external=False) 

85 except Exception as e: 

86 _LOGGER.warning("SUPERNOTIFY could not get internal hass url: %s", e) 

87 self.hass_internal_url = f"http://{socket.gethostname()}" 

88 try: 

89 self.hass_external_url = get_url(hass, prefer_external=True) 

90 except Exception as e: 

91 _LOGGER.warning("SUPERNOTIFY could not get external hass url: %s", e) 

92 self.hass_external_url = self.hass_internal_url 

93 else: 

94 self.hass_internal_url = "" 

95 self.hass_external_url = "" 

96 self.hass_name = "!UNDEFINED!" 

97 _LOGGER.warning("SUPERNOTIFY Configured without HomeAssistant instance") 

98 

99 _LOGGER.debug( 

100 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s", 

101 self.hass_name, 

102 self.hass_internal_url, 

103 self.hass_external_url, 

104 ) 

105 

106 if not self.hass_internal_url or not self.hass_internal_url.startswith("http"): 

107 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.hass_internal_url) 

108 

109 self.links: list[dict[str, Any]] = ensure_list(links) 

110 # raw configured deliveries 

111 self._deliveries: dict[str, Any] = deliveries if isinstance(deliveries, dict) else {} 

112 # validated deliveries 

113 self.deliveries: dict[str, Any] = {} 

114 self._recipients: list[dict[str, Any]] = ensure_list(recipients) 

115 self.mobile_actions: dict[str, Any] = mobile_actions or {} 

116 self.template_path: Path | None = Path(template_path) if template_path else None 

117 self.media_path: Path | None = Path(media_path) if media_path else None 

118 archive_config = archive_config or {} 

119 self.archive: NotificationArchive = NotificationArchive( 

120 bool(archive_config.get(CONF_ENABLED, False)), 

121 archive_config.get(CONF_ARCHIVE_PATH), 

122 archive_config.get(CONF_ARCHIVE_DAYS), 

123 ) 

124 archive_topic = archive_config.get(CONF_ARCHIVE_MQTT_TOPIC) 

125 self.archive_topic: ArchiveTopic | None = None 

126 if archive_topic is not None and self.hass: 

127 self.archive_topic = ArchiveTopic( 

128 self.hass, 

129 archive_topic, 

130 int(archive_config.get(CONF_ARCHIVE_MQTT_QOS, 0)), 

131 boolean(archive_config.get(CONF_ARCHIVE_MQTT_RETAIN, True)), 

132 ) 

133 else: 

134 self.archive_topic = None 

135 self.cameras: dict[str, Any] = {c[CONF_CAMERA]: c for c in cameras} if cameras else {} 

136 self.methods: dict[str, DeliveryMethod] = {} 

137 self.method_defaults: dict[str, Any] = method_defaults or {} 

138 self.scenarios: dict[str, Scenario] = {} 

139 self.people: dict[str, dict[str, Any]] = {} 

140 self._config_scenarios: dict[str, Any] = scenarios or {} 

141 self.content_scenario_templates: dict[str, Any] = {} 

142 self.delivery_by_scenario: dict[str, list[str]] = {SCENARIO_DEFAULT: []} 

143 self.fallback_on_error: dict[str, dict[str, Any]] = {} 

144 self.fallback_by_default: dict[str, dict[str, Any]] = {} 

145 self._entity_registry: entity_registry.EntityRegistry | None = None 

146 self._device_registry: device_registry.DeviceRegistry | None = None 

147 self.snoozer = Snoozer() 

148 

149 async def initialize(self) -> None: 

150 self.people = self.setup_people(self._recipients) 

151 

152 if self._config_scenarios and self.hass: 

153 for scenario_name, scenario_definition in self._config_scenarios.items(): 

154 scenario = Scenario(scenario_name, scenario_definition, self.hass) 

155 if await scenario.validate(): 

156 self.scenarios[scenario_name] = scenario 

157 

158 if self.template_path and not self.template_path.exists(): 

159 _LOGGER.warning("SUPERNOTIFY template path not found at %s", self.template_path) 

160 self.template_path = None 

161 

162 if self.media_path and not self.media_path.exists(): 

163 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path) 

164 try: 

165 self.media_path.mkdir(parents=True, exist_ok=True) 

166 except Exception as e: 

167 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e) 

168 self.media_path = None 

169 if self.media_path is not None: 

170 _LOGGER.info("SUPERNOTIFY abs media path: %s", self.media_path.absolute()) 

171 if self.archive: 

172 self.archive.initialize() 

173 default_deliveries: dict[str, Any] = self.initialize_deliveries() 

174 self.initialize_scenarios(default_deliveries) 

175 

176 def initialize_deliveries(self) -> dict[str, Any]: 

177 default_deliveries = {} 

178 if self._deliveries: 

179 for d, dc in self._deliveries.items(): 

180 if dc.get(CONF_ENABLED, True): 

181 if SELECTION_FALLBACK_ON_ERROR in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

182 self.fallback_on_error[d] = dc 

183 if SELECTION_FALLBACK in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

184 self.fallback_by_default[d] = dc 

185 if SELECTION_DEFAULT in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

186 default_deliveries[d] = dc 

187 

188 if not dc.get(CONF_NAME): 

189 dc[CONF_NAME] = d # for minimal tests 

190 for conf_key in METHOD_DEFAULTS_SCHEMA.schema: 

191 self.set_method_default(dc, conf_key.schema) 

192 return default_deliveries 

193 

194 def initialize_scenarios(self, default_deliveries: dict[str, Any]) -> None: 

195 for scenario_name, scenario in self.scenarios.items(): 

196 self.delivery_by_scenario.setdefault(scenario_name, []) 

197 if scenario.delivery_selection == DELIVERY_SELECTION_IMPLICIT: 

198 scenario_deliveries: list[str] = list(default_deliveries.keys()) 

199 else: 

200 scenario_deliveries = [] 

201 scenario_definition_delivery = scenario.delivery 

202 scenario_deliveries.extend(s for s in scenario_definition_delivery if s not in scenario_deliveries) 

203 

204 for scenario_delivery in scenario_deliveries: 

205 if safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_ENABLED, True): 

206 self.delivery_by_scenario[scenario_name].append(scenario_delivery) 

207 

208 scenario_delivery_config = safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_DATA, {}) 

209 

210 # extract message and title templates per scenario per delivery 

211 for template_field in SCENARIO_TEMPLATE_ATTRS: 

212 template_format = scenario_delivery_config.get(template_field) 

213 if template_format is not None: 

214 self.content_scenario_templates.setdefault(template_field, {}) 

215 self.content_scenario_templates[template_field].setdefault(scenario_delivery, []) 

216 self.content_scenario_templates[template_field][scenario_delivery].append(scenario_name) 

217 

218 self.delivery_by_scenario[SCENARIO_DEFAULT] = list(default_deliveries.keys()) 

219 

220 async def register_delivery_methods( 

221 self, 

222 delivery_methods: list[DeliveryMethod] | None = None, 

223 delivery_method_classes: list[type[DeliveryMethod]] | None = None, 

224 set_as_default: bool = False, 

225 ) -> None: 

226 """Available directly for test fixtures supplying class or instance""" 

227 if delivery_methods: 

228 for delivery_method in delivery_methods: 

229 self.methods[delivery_method.method] = delivery_method 

230 await self.methods[delivery_method.method].initialize() 

231 self.deliveries.update(self.methods[delivery_method.method].valid_deliveries) 

232 if delivery_method_classes and self.hass: 

233 for delivery_method_class in delivery_method_classes: 

234 self.methods[delivery_method_class.method] = delivery_method_class(self.hass, self, self._deliveries) 

235 await self.methods[delivery_method_class.method].initialize() 

236 self.deliveries.update(self.methods[delivery_method_class.method].valid_deliveries) 

237 

238 for d, dc in self.deliveries.items(): 

239 if dc.get(CONF_METHOD) not in self.methods: 

240 _LOGGER.warning("SUPERNOTIFY Ignoring delivery %s without known method %s", d, dc.get(CONF_METHOD)) 

241 elif set_as_default and d not in self.delivery_by_scenario[SCENARIO_DEFAULT]: 

242 self.delivery_by_scenario[SCENARIO_DEFAULT].append(d) 

243 

244 _LOGGER.info("SUPERNOTIFY configured deliveries %s", "; ".join(self.deliveries.keys())) 

245 

246 def set_method_default(self, delivery_config: dict[str, Any], attr: str) -> None: 

247 if attr not in delivery_config and CONF_METHOD in delivery_config: 

248 method_default: dict[str, Any] = self.method_defaults.get(delivery_config[CONF_METHOD], {}) 

249 if method_default.get(attr): 

250 delivery_config[attr] = method_default[attr] 

251 _LOGGER.debug( 

252 "SUPERNOTIFY Defaulting delivery %s to %s %s", delivery_config[CONF_NAME], attr, delivery_config[attr] 

253 ) 

254 

255 def delivery_method(self, delivery: str) -> DeliveryMethod: 

256 method_name = self.deliveries.get(delivery, {}).get(CONF_METHOD) 

257 method: DeliveryMethod | None = self.methods.get(method_name) 

258 if not method: 

259 raise ValueError(f"SUPERNOTIFY No method for delivery {delivery}") 

260 return method 

261 

262 def setup_people(self, recipients: list[dict[str, Any]] | tuple[dict[str, Any]]) -> dict[str, dict[str, Any]]: 

263 people: dict[str, dict[str, Any]] = {} 

264 for r in recipients: 

265 if r.get(CONF_MOBILE_DISCOVERY): 

266 r[CONF_MOBILE_DEVICES].extend(self.mobile_devices_for_person(r[CONF_PERSON])) 

267 if r.get(CONF_MOBILE_DEVICES): 

268 _LOGGER.info("SUPERNOTIFY Auto configured %s for mobile devices %s", r[CONF_PERSON], r[CONF_MOBILE_DEVICES]) 

269 else: 

270 _LOGGER.warning("SUPERNOTIFY Unable to find mobile devices for %s", r[CONF_PERSON]) 

271 if self.hass: 

272 state: State | None = self.hass.states.get(r[CONF_PERSON]) 

273 if state is not None: 

274 r[ATTR_USER_ID] = state.attributes.get(ATTR_USER_ID) 

275 people[r[CONF_PERSON]] = r 

276 return people 

277 

278 def people_state(self) -> list[dict[str, Any]]: 

279 results = [] 

280 if self.hass: 

281 for person, person_config in self.people.items(): 

282 # TODO: possibly rate limit this 

283 try: 

284 tracker = self.hass.states.get(person) 

285 if tracker is None: 

286 person_config[ATTR_STATE] = None 

287 else: 

288 person_config[ATTR_STATE] = tracker.state 

289 except Exception as e: 

290 _LOGGER.warning("Unable to determine occupied status for %s: %s", person, e) 

291 results.append(person_config) 

292 return results 

293 

294 def determine_occupancy(self) -> dict[str, list[dict[str, Any]]]: 

295 results: dict[str, list[dict[str, Any]]] = {STATE_HOME: [], STATE_NOT_HOME: []} 

296 for person_config in self.people_state(): 

297 if person_config.get(ATTR_STATE) in (None, STATE_HOME): 

298 # default to at home if unknown tracker 

299 results[STATE_HOME].append(person_config) 

300 else: 

301 results[STATE_NOT_HOME].append(person_config) 

302 return results 

303 

304 def entity_registry(self) -> entity_registry.EntityRegistry | None: 

305 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing 

306 to all entities, so do it once here 

307 """ # noqa: D205 

308 if self._entity_registry is not None: 

309 return self._entity_registry 

310 if self.hass: 

311 try: 

312 self._entity_registry = entity_registry.async_get(self.hass) 

313 except Exception as e: 

314 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e) 

315 return self._entity_registry 

316 

317 def device_registry(self) -> device_registry.DeviceRegistry | None: 

318 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing 

319 to all devices, so do it once here 

320 """ # noqa: D205 

321 if self._device_registry is not None: 

322 return self._device_registry 

323 if self.hass: 

324 try: 

325 self._device_registry = device_registry.async_get(self.hass) 

326 except Exception as e: 

327 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e) 

328 return self._device_registry 

329 

330 def mobile_devices_for_person(self, person_entity_id: str, validate_targets: bool = False) -> list[dict[str, Any]]: 

331 """Auto detect mobile_app targets for a person. 

332 

333 Targets not currently validated as async registration may not be complete at this stage 

334 

335 Args: 

336 ---- 

337 person_entity_id (str): _description_ 

338 validate_targets (bool, optional): _description_. Defaults to False. 

339 

340 Returns: 

341 ------- 

342 list: mobile target actions for this person 

343 

344 """ 

345 mobile_devices = [] 

346 person_state = self.hass.states.get(person_entity_id) if self.hass else None 

347 if not person_state: 

348 _LOGGER.warning("SUPERNOTIFY Unable to resolve %s", person_entity_id) 

349 else: 

350 ent_reg = self.entity_registry() 

351 dev_reg = self.device_registry() 

352 if not ent_reg or not dev_reg: 

353 _LOGGER.warning("SUPERNOTIFY Unable to access entity or device registries for %s", person_entity_id) 

354 else: 

355 for d_t in person_state.attributes.get("device_trackers", ()): 

356 entity = ent_reg.async_get(d_t) 

357 if entity and entity.platform == "mobile_app" and entity.device_id: 

358 device = dev_reg.async_get(entity.device_id) 

359 if not device: 

360 _LOGGER.warning("SUPERNOTIFY Unable to find device %s", entity.device_id) 

361 else: 

362 notify_action = f"mobile_app_{slugify(device.name)}" 

363 if ( 

364 validate_targets 

365 and self.hass 

366 and self.hass.services 

367 and not self.hass.services.has_service("notify", notify_action) 

368 ): 

369 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", notify_action) 

370 else: 

371 mobile_devices.append({ 

372 CONF_MANUFACTURER: device.manufacturer, 

373 CONF_MODEL: device.model, 

374 CONF_NOTIFY_ACTION: notify_action, 

375 CONF_DEVICE_TRACKER: d_t, 

376 CONF_DEVICE_ID: device.id, 

377 CONF_DEVICE_NAME: device.name, 

378 # CONF_DEVICE_LABELS: device.labels, 

379 }) 

380 else: 

381 _LOGGER.debug("SUPERNOTIFY Ignoring device tracker %s", d_t) 

382 

383 return mobile_devices