Coverage for custom_components/supernotify/configuration.py: 93%

217 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-28 14:21 +0000

1from __future__ import annotations 

2 

3import logging 

4import socket 

5from pathlib import Path 

6from typing import TYPE_CHECKING, Any 

7 

8from homeassistant.const import ATTR_STATE, CONF_DEVICE_ID, CONF_ENABLED, CONF_METHOD, CONF_NAME, STATE_HOME, STATE_NOT_HOME 

9from homeassistant.helpers import device_registry, entity_registry 

10from homeassistant.helpers.network import get_url 

11from homeassistant.util import slugify 

12 

13from custom_components.supernotify.archive import NotificationArchive 

14from custom_components.supernotify.common import ensure_list, safe_get 

15from custom_components.supernotify.snoozer import Snoozer 

16 

17from . import ( 

18 ATTR_USER_ID, 

19 CONF_ARCHIVE_DAYS, 

20 CONF_ARCHIVE_PATH, 

21 CONF_CAMERA, 

22 CONF_DEVICE_NAME, 

23 CONF_DEVICE_TRACKER, 

24 CONF_MANUFACTURER, 

25 CONF_MOBILE_DEVICES, 

26 CONF_MOBILE_DISCOVERY, 

27 CONF_MODEL, 

28 CONF_NOTIFY_ACTION, 

29 CONF_PERSON, 

30 CONF_SELECTION, 

31 DELIVERY_SELECTION_IMPLICIT, 

32 METHOD_DEFAULTS_SCHEMA, 

33 SCENARIO_DEFAULT, 

34 SELECTION_DEFAULT, 

35 SELECTION_FALLBACK, 

36 SELECTION_FALLBACK_ON_ERROR, 

37) 

38from .scenario import Scenario 

39 

40if TYPE_CHECKING: 

41 from homeassistant.core import HomeAssistant, State 

42 

43 from custom_components.supernotify.delivery_method import DeliveryMethod 

44 

45_LOGGER = logging.getLogger(__name__) 

46 

47 

48class SupernotificationConfiguration: 

49 def __init__( 

50 self, 

51 hass: HomeAssistant | None = None, 

52 deliveries: dict | None = None, 

53 links: list | None = None, 

54 recipients: list | None = None, 

55 mobile_actions: dict | None = None, 

56 template_path: str | None = None, 

57 media_path: str | None = None, 

58 archive_config: dict[str, str] | None = None, 

59 scenarios: dict[str, dict] | None = None, 

60 method_defaults: dict | None = None, 

61 cameras: list[dict] | None = None, 

62 ) -> None: 

63 self.hass: HomeAssistant | None = None 

64 self.hass_internal_url: str 

65 self.hass_external_url: str 

66 if hass: 

67 self.hass = hass 

68 self.hass_name = hass.config.location_name 

69 try: 

70 self.hass_internal_url = get_url(hass, prefer_external=False) 

71 except Exception as e: 

72 _LOGGER.warning("SUPERNOTIFY could not get internal hass url: %s", e) 

73 self.hass_internal_url = f"http://{socket.gethostname()}" 

74 try: 

75 self.hass_external_url = get_url(hass, prefer_external=True) 

76 except Exception as e: 

77 _LOGGER.warning("SUPERNOTIFY could not get external hass url: %s", e) 

78 self.hass_external_url = self.hass_internal_url 

79 else: 

80 self.hass_internal_url = "" 

81 self.hass_external_url = "" 

82 self.hass_name = "!UNDEFINED!" 

83 _LOGGER.warning("SUPERNOTIFY Configured without HomeAssistant instance") 

84 

85 _LOGGER.debug( 

86 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s", 

87 self.hass_name, 

88 self.hass_internal_url, 

89 self.hass_external_url, 

90 ) 

91 

92 if not self.hass_internal_url or not self.hass_internal_url.startswith("http"): 

93 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.hass_internal_url) 

94 

95 self.links = ensure_list(links) 

96 # raw configured deliveries 

97 self._deliveries: dict[str, Any] = deliveries if isinstance(deliveries, dict) else {} 

98 # validated deliveries 

99 self.deliveries: dict[str, Any] = {} 

100 self._recipients: list = ensure_list(recipients) 

101 self.mobile_actions: dict = mobile_actions or {} 

102 self.template_path: Path | None = Path(template_path) if template_path else None 

103 self.media_path: Path | None = Path(media_path) if media_path else None 

104 archive_config = archive_config or {} 

105 self.archive: NotificationArchive = NotificationArchive( 

106 archive_config.get(CONF_ARCHIVE_PATH), archive_config.get(CONF_ARCHIVE_DAYS) 

107 ) 

108 self.cameras: dict[str, Any] = {c[CONF_CAMERA]: c for c in cameras} if cameras else {} 

109 self.methods: dict[str, DeliveryMethod] = {} 

110 self.method_defaults: dict = method_defaults or {} 

111 self.scenarios: dict[str, Scenario] = {} 

112 self.people: dict[str, dict] = {} 

113 self.applied_scenarios: dict = scenarios or {} 

114 self.delivery_by_scenario: dict[str, list] = {SCENARIO_DEFAULT: []} 

115 self.fallback_on_error: dict = {} 

116 self.fallback_by_default: dict = {} 

117 self._entity_registry: entity_registry.EntityRegistry | None = None 

118 self._device_registry: device_registry.DeviceRegistry | None = None 

119 self.snoozer = Snoozer() 

120 

121 async def initialize(self) -> None: 

122 self.people = self.setup_people(self._recipients) 

123 

124 if self.applied_scenarios and self.hass: 

125 for scenario_name, scenario_definition in self.applied_scenarios.items(): 

126 scenario = Scenario(scenario_name, scenario_definition, self.hass) 

127 if await scenario.validate(): 

128 self.scenarios[scenario_name] = scenario 

129 

130 if self.template_path and not self.template_path.exists(): 

131 _LOGGER.warning("SUPERNOTIFY template path not found at %s", self.template_path) 

132 self.template_path = None 

133 

134 if self.media_path and not self.media_path.exists(): 

135 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path) 

136 try: 

137 self.media_path.mkdir(parents=True, exist_ok=True) 

138 except Exception as e: 

139 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e) 

140 self.media_path = None 

141 if self.media_path is not None: 

142 _LOGGER.info("SUPERNOTIFY abs media path: %s", self.media_path.absolute()) 

143 if self.archive: 

144 self.archive.initialize() 

145 default_deliveries: dict = self.initialize_deliveries() 

146 self.initialize_scenarios(default_deliveries) 

147 

148 def initialize_deliveries(self) -> dict: 

149 default_deliveries = {} 

150 if self._deliveries: 

151 for d, dc in self._deliveries.items(): 

152 if dc.get(CONF_ENABLED, True): 

153 if SELECTION_FALLBACK_ON_ERROR in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

154 self.fallback_on_error[d] = dc 

155 if SELECTION_FALLBACK in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

156 self.fallback_by_default[d] = dc 

157 if SELECTION_DEFAULT in dc.get(CONF_SELECTION, [SELECTION_DEFAULT]): 

158 default_deliveries[d] = dc 

159 

160 if not dc.get(CONF_NAME): 

161 dc[CONF_NAME] = d # for minimal tests 

162 for conf_key in METHOD_DEFAULTS_SCHEMA.schema: 

163 self.set_method_default(dc, conf_key.schema) 

164 return default_deliveries 

165 

166 def initialize_scenarios(self, default_deliveries: dict) -> None: 

167 for scenario_name, scenario in self.scenarios.items(): 

168 self.delivery_by_scenario.setdefault(scenario_name, []) 

169 if scenario.delivery_selection == DELIVERY_SELECTION_IMPLICIT: 

170 scenario_deliveries: list[str] = list(default_deliveries.keys()) 

171 else: 

172 scenario_deliveries = [] 

173 scenario_definition_delivery = scenario.delivery 

174 scenario_deliveries.extend(s for s in scenario_definition_delivery if s not in scenario_deliveries) 

175 

176 for scenario_delivery in scenario_deliveries: 

177 if safe_get(scenario_definition_delivery.get(scenario_delivery), CONF_ENABLED, True): 

178 self.delivery_by_scenario[scenario_name].append(scenario_delivery) 

179 

180 self.delivery_by_scenario[SCENARIO_DEFAULT] = list(default_deliveries.keys()) 

181 

182 async def register_delivery_methods( 

183 self, 

184 delivery_methods: list[DeliveryMethod] | None = None, 

185 delivery_method_classes: list[type[DeliveryMethod]] | None = None, 

186 set_as_default: bool = False, 

187 ) -> None: 

188 """Available directly for test fixtures supplying class or instance""" 

189 if delivery_methods: 

190 for delivery_method in delivery_methods: 

191 self.methods[delivery_method.method] = delivery_method 

192 await self.methods[delivery_method.method].initialize() 

193 self.deliveries.update(self.methods[delivery_method.method].valid_deliveries) 

194 if delivery_method_classes and self.hass: 

195 for delivery_method_class in delivery_method_classes: 

196 self.methods[delivery_method_class.method] = delivery_method_class(self.hass, self, self._deliveries) 

197 await self.methods[delivery_method_class.method].initialize() 

198 self.deliveries.update(self.methods[delivery_method_class.method].valid_deliveries) 

199 

200 for d, dc in self.deliveries.items(): 

201 if dc.get(CONF_METHOD) not in self.methods: 

202 _LOGGER.info("SUPERNOTIFY Ignoring delivery %s without known method %s", d, dc.get(CONF_METHOD)) 

203 elif set_as_default and d not in self.delivery_by_scenario[SCENARIO_DEFAULT]: 

204 self.delivery_by_scenario[SCENARIO_DEFAULT].append(d) 

205 

206 _LOGGER.info("SUPERNOTIFY configured deliveries %s", "; ".join(self.deliveries.keys())) 

207 

208 def set_method_default(self, delivery_config: dict[str, Any], attr: str) -> None: 

209 if attr not in delivery_config: 

210 method_default: dict = self.method_defaults.get(delivery_config.get(CONF_METHOD), {}) 

211 if method_default.get(attr): 

212 delivery_config[attr] = method_default[attr] 

213 _LOGGER.debug( 

214 "SUPERNOTIFY Defaulting delivery %s to %s %s", delivery_config[CONF_NAME], attr, delivery_config[attr] 

215 ) 

216 

217 def delivery_method(self, delivery: str) -> DeliveryMethod: 

218 method_name = self.deliveries.get(delivery, {}).get(CONF_METHOD) 

219 method: DeliveryMethod | None = self.methods.get(method_name) 

220 if not method: 

221 raise ValueError(f"SUPERNOTIFY No method for delivery {delivery}") 

222 return method 

223 

224 def setup_people(self, recipients: list[dict] | tuple[dict]) -> dict[str, dict]: 

225 people: dict[str, dict] = {} 

226 for r in recipients: 

227 if r.get(CONF_MOBILE_DISCOVERY): 

228 r[CONF_MOBILE_DEVICES].extend(self.mobile_devices_for_person(r[CONF_PERSON])) 

229 if r.get(CONF_MOBILE_DEVICES): 

230 _LOGGER.info("SUPERNOTIFY Auto configured %s for mobile devices %s", r[CONF_PERSON], r[CONF_MOBILE_DEVICES]) 

231 else: 

232 _LOGGER.warning("SUPERNOTIFY Unable to find mobile devices for %s", r[CONF_PERSON]) 

233 if self.hass: 

234 state: State | None = self.hass.states.get(r[CONF_PERSON]) 

235 if state is not None: 

236 r[ATTR_USER_ID] = state.attributes.get(ATTR_USER_ID) 

237 people[r[CONF_PERSON]] = r 

238 return people 

239 

240 def people_state(self) -> list[dict]: 

241 results = [] 

242 if self.hass: 

243 for person, person_config in self.people.items(): 

244 # TODO: possibly rate limit this 

245 try: 

246 tracker = self.hass.states.get(person) 

247 if tracker is None: 

248 person_config[ATTR_STATE] = None 

249 else: 

250 person_config[ATTR_STATE] = tracker.state 

251 except Exception as e: 

252 _LOGGER.warning("Unable to determine occupied status for %s: %s", person, e) 

253 results.append(person_config) 

254 return results 

255 

256 def determine_occupancy(self) -> dict[str, list[dict]]: 

257 results: dict[str, list[dict]] = {STATE_HOME: [], STATE_NOT_HOME: []} 

258 for person_config in self.people_state(): 

259 if person_config.get(ATTR_STATE) in (None, STATE_HOME): 

260 # default to at home if unknown tracker 

261 results[STATE_HOME].append(person_config) 

262 else: 

263 results[STATE_NOT_HOME].append(person_config) 

264 return results 

265 

266 def entity_registry(self) -> entity_registry.EntityRegistry | None: 

267 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing 

268 to all entities, so do it once here 

269 """ # noqa: D205 

270 if self._entity_registry is not None: 

271 return self._entity_registry 

272 if self.hass: 

273 try: 

274 self._entity_registry = entity_registry.async_get(self.hass) 

275 except Exception as e: 

276 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e) 

277 return self._entity_registry 

278 

279 def device_registry(self) -> device_registry.DeviceRegistry | None: 

280 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing 

281 to all devices, so do it once here 

282 """ # noqa: D205 

283 if self._device_registry is not None: 

284 return self._device_registry 

285 if self.hass: 

286 try: 

287 self._device_registry = device_registry.async_get(self.hass) 

288 except Exception as e: 

289 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e) 

290 return self._device_registry 

291 

292 def mobile_devices_for_person(self, person_entity_id: str, validate_targets: bool = False) -> list: 

293 """Auto detect mobile_app targets for a person. 

294 

295 Targets not currently validated as async registration may not be complete at this stage 

296 

297 Args: 

298 ---- 

299 person_entity_id (str): _description_ 

300 validate_targets (bool, optional): _description_. Defaults to False. 

301 

302 Returns: 

303 ------- 

304 list: mobile target actions for this person 

305 

306 """ 

307 mobile_devices = [] 

308 person_state = self.hass.states.get(person_entity_id) if self.hass else None 

309 if not person_state: 

310 _LOGGER.warning("SUPERNOTIFY Unable to resolve %s", person_entity_id) 

311 else: 

312 ent_reg = self.entity_registry() 

313 dev_reg = self.device_registry() 

314 if not ent_reg or not dev_reg: 

315 _LOGGER.warning("SUPERNOTIFY Unable to access entity or device registries for %s", person_entity_id) 

316 else: 

317 for d_t in person_state.attributes.get("device_trackers", ()): 

318 entity = ent_reg.async_get(d_t) 

319 if entity and entity.platform == "mobile_app" and entity.device_id: 

320 device = dev_reg.async_get(entity.device_id) 

321 if not device: 

322 _LOGGER.warning("SUPERNOTIFY Unable to find device %s", entity.device_id) 

323 else: 

324 notify_action = f"mobile_app_{slugify(device.name)}" 

325 if ( 

326 validate_targets 

327 and self.hass 

328 and self.hass.services 

329 and not self.hass.services.has_service("notify", notify_action) 

330 ): 

331 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", notify_action) 

332 else: 

333 mobile_devices.append({ 

334 CONF_MANUFACTURER: device.manufacturer, 

335 CONF_MODEL: device.model, 

336 CONF_NOTIFY_ACTION: notify_action, 

337 CONF_DEVICE_TRACKER: d_t, 

338 CONF_DEVICE_ID: device.id, 

339 CONF_DEVICE_NAME: device.name, 

340 # CONF_DEVICE_LABELS: device.labels, 

341 }) 

342 else: 

343 _LOGGER.debug("SUPERNOTIFY Ignoring device tracker %s", d_t) 

344 

345 return mobile_devices