Coverage for custom_components/supernotify/snoozer.py: 86%

182 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-28 14:21 +0000

1import logging 

2import time 

3from typing import Any 

4 

5from homeassistant.core import Event 

6 

7from custom_components.supernotify.common import format_timestamp, update_dict_list 

8 

9from . import ( 

10 ATTR_ACTION, 

11 ATTR_USER_ID, 

12 CONF_METHOD, 

13 CONF_MOBILE_DEVICES, 

14 CONF_NOTIFY_ACTION, 

15 CONF_PERSON, 

16 PRIORITY_CRITICAL, 

17 PRIORITY_MEDIUM, 

18 CommandType, 

19 GlobalTargetType, 

20 QualifiedTargetType, 

21 RecipientType, 

22 TargetType, 

23) 

24 

25SNOOZE_TIME = 60 * 60 # TODO: move to configuration 

26_LOGGER = logging.getLogger(__name__) 

27 

28 

29class Snooze: 

30 target: str | None 

31 target_type: TargetType 

32 snoozed_at: float 

33 snooze_until: float | None = None 

34 recipient_type: RecipientType 

35 recipient: str | None 

36 reason: str | None = None 

37 

38 def __init__( 

39 self, 

40 target_type: TargetType, 

41 recipient_type: RecipientType, 

42 target: str | None = None, 

43 recipient: str | None = None, 

44 snooze_for: int | None = None, 

45 reason: str | None = None, 

46 ) -> None: 

47 self.snoozed_at = time.time() 

48 self.target = target 

49 self.target_type = target_type 

50 self.recipient_type = recipient_type 

51 self.recipient = recipient 

52 self.reason = reason 

53 if snooze_for: 

54 self.snooze_until = self.snoozed_at + snooze_for 

55 

56 def std_recipient(self) -> str | None: 

57 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE 

58 

59 def short_key(self) -> str: 

60 # only one GLOBAL can be active at a time 

61 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}" 

62 return f"{target}_{self.std_recipient()}" 

63 

64 def __eq__(self, other: object) -> bool: 

65 """Check if two snoozes for the same thing""" 

66 if not isinstance(other, Snooze): 

67 return False 

68 return self.short_key() == other.short_key() 

69 

70 def __repr__(self) -> str: 

71 """Return a string representation of the object.""" 

72 return f"Snooze({self.target_type}, {self.target}, {self.std_recipient()})" 

73 

74 def active(self) -> bool: 

75 return self.snooze_until is None or self.snooze_until > time.time() 

76 

77 def export(self) -> dict: 

78 return { 

79 "target_type": self.target_type, 

80 "target": self.target, 

81 "recipient_type": self.recipient_type, 

82 "recipient": self.recipient, 

83 "reason": self.reason, 

84 "snoozed_at": format_timestamp(self.snoozed_at), 

85 "snooze_until": format_timestamp(self.snooze_until), 

86 } 

87 

88 

89class Snoozer: 

90 """Manage snoozing""" 

91 

92 def __init__(self) -> None: 

93 self.snoozes: dict[str, Snooze] = {} 

94 

95 def handle_command_event(self, event: Event, people: dict[str, Any] | None = None) -> None: 

96 people = people or {} 

97 try: 

98 cmd: CommandType 

99 target_type: TargetType | None = None 

100 target: str | None = None 

101 snooze_for: int = SNOOZE_TIME 

102 recipient_type: RecipientType | None = None 

103 event_name = event.data.get(ATTR_ACTION) 

104 

105 if not event_name: 

106 _LOGGER.warning( 

107 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s", 

108 event.origin, 

109 event.time_fired, 

110 event.data, 

111 event.context, 

112 ) 

113 return 

114 

115 _LOGGER.debug( 

116 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context 

117 ) 

118 event_parts: list[str] = event_name.split("_") 

119 if len(event_parts) < 4: 

120 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name) 

121 return 

122 cmd = CommandType[event_parts[1]] 

123 recipient_type = RecipientType[event_parts[2]] 

124 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4: 

125 target_type = QualifiedTargetType[event_parts[3]] 

126 target = event_parts[4] 

127 snooze_for = int(event_parts[-1]) if len(event_parts) == 6 else SNOOZE_TIME 

128 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4: 

129 target_type = GlobalTargetType[event_parts[3]] 

130 snooze_for = int(event_parts[-1]) if len(event_parts) == 5 else SNOOZE_TIME 

131 

132 if cmd is None or target_type is None or recipient_type is None: 

133 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name) 

134 return 

135 

136 except KeyError as ke: 

137 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke) 

138 return 

139 except Exception as e: 

140 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e) 

141 return 

142 

143 try: 

144 recipient: str | None = None 

145 if recipient_type == RecipientType.USER: 

146 target_people = [ 

147 p.get(CONF_PERSON) 

148 for p in people.values() 

149 if p.get(ATTR_USER_ID) == event.context.user_id and event.context.user_id is not None and p.get(CONF_PERSON) 

150 ] 

151 if target_people: 

152 recipient = target_people[0] 

153 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient) 

154 else: 

155 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id) 

156 return 

157 

158 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for) 

159 

160 except Exception as e: 

161 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e) 

162 

163 def register_snooze( 

164 self, 

165 cmd: CommandType, 

166 target_type: TargetType, 

167 target: str | None, 

168 recipient_type: RecipientType, 

169 recipient: str | None, 

170 snooze_for: int | None, 

171 reason: str = "User command", 

172 ) -> None: 

173 if cmd == CommandType.SNOOZE: 

174 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason) 

175 self.snoozes[snooze.short_key()] = snooze 

176 elif cmd == CommandType.SILENCE: 

177 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason) 

178 self.snoozes[snooze.short_key()] = snooze 

179 elif cmd == CommandType.NORMAL: 

180 anti_snooze = Snooze(target_type, recipient_type, target, recipient) 

181 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()] 

182 for k in to_del: 

183 del self.snoozes[k] 

184 else: 

185 _LOGGER.warning( 

186 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)", 

187 cmd, 

188 target_type, 

189 target, 

190 recipient_type, 

191 ) 

192 

193 def purge_snoozes(self) -> None: 

194 to_del = [k for k, v in self.snoozes.items() if not v.active()] 

195 for k in to_del: 

196 del self.snoozes[k] 

197 

198 def clear(self) -> int: 

199 cleared = len(self.snoozes) 

200 self.snoozes.clear() 

201 return cleared 

202 

203 def export(self) -> list[dict]: 

204 return [s.export() for s in self.snoozes.values()] 

205 

206 def current_snoozes( 

207 self, 

208 priority: str = PRIORITY_MEDIUM, 

209 delivery_names: list | None = None, 

210 delivery_definitions: dict[str, dict] | None = None, 

211 ) -> list[Snooze]: 

212 delivery_names = delivery_names or [] 

213 delivery_definitions = delivery_definitions or {} 

214 inscope_snoozes: list[Snooze] = [] 

215 

216 for snooze in self.snoozes.values(): 

217 if snooze.active(): 

218 match snooze.target_type: 

219 case GlobalTargetType.EVERYTHING: 

220 inscope_snoozes.append(snooze) 

221 case GlobalTargetType.NONCRITICAL: 

222 if priority != PRIORITY_CRITICAL: 

223 inscope_snoozes.append(snooze) 

224 case QualifiedTargetType.DELIVERY: 

225 if snooze.target in delivery_names: 

226 inscope_snoozes.append(snooze) 

227 case QualifiedTargetType.PRIORITY: 

228 if snooze.target == priority: 

229 inscope_snoozes.append(snooze) 

230 case QualifiedTargetType.ACTION: 

231 inscope_snoozes.append(snooze) 

232 case QualifiedTargetType.METHOD: 

233 if snooze.target in [delivery_definitions.get(d, {}).get(CONF_METHOD) for d in delivery_names]: 

234 inscope_snoozes.append(snooze) 

235 case QualifiedTargetType.CAMERA: 

236 inscope_snoozes.append(snooze) 

237 case _: 

238 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type) 

239 

240 return inscope_snoozes 

241 

242 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool: 

243 for snooze in self.snoozes.values(): 

244 if snooze.active(): 

245 match snooze.target_type: 

246 case GlobalTargetType.EVERYTHING: 

247 return True 

248 case GlobalTargetType.NONCRITICAL: 

249 if priority != PRIORITY_CRITICAL: 

250 return True 

251 

252 return False 

253 

254 def filter_recipients( 

255 self, 

256 recipients: list[dict], 

257 priority: str, 

258 delivery_name: str, 

259 delivery_method: "DeliveryMethod", # type: ignore # noqa: F821 

260 all_delivery_names: list[str], 

261 delivery_definitions: dict[str, Any], 

262 ) -> list[dict]: 

263 inscope_snoozes = self.current_snoozes(priority, all_delivery_names, delivery_definitions) 

264 for snooze in inscope_snoozes: 

265 if snooze.recipient_type == RecipientType.USER: 

266 # assume the everyone checks are made before notification gets this far 

267 if ( 

268 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery_name) 

269 or (snooze.target_type == QualifiedTargetType.METHOD and snooze.target == delivery_method.method) 

270 or ( 

271 snooze.target_type == QualifiedTargetType.PRIORITY 

272 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target)) 

273 ) 

274 or snooze.target_type == GlobalTargetType.EVERYTHING 

275 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL) 

276 ): 

277 recipients_to_remove = [] 

278 for recipient in recipients: 

279 if recipient.get(CONF_PERSON) == snooze.recipient: 

280 recipients_to_remove.append(recipient) 

281 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient) 

282 for r in recipients_to_remove: 

283 recipients.remove(r) 

284 

285 if snooze.target_type == QualifiedTargetType.ACTION: 

286 to_remove: list[dict] = [] 

287 to_add: list[dict] = [] 

288 for recipient in recipients: 

289 if recipient.get(CONF_PERSON) == snooze.recipient: 

290 alt_mobiles: list[dict] = list(recipient.get(CONF_MOBILE_DEVICES, [])) 

291 md_to_remove = [] 

292 for md in alt_mobiles: 

293 if md.get(CONF_NOTIFY_ACTION) == snooze.target: 

294 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target) 

295 md_to_remove.append(md) 

296 for md in md_to_remove: 

297 alt_mobiles.remove(md) 

298 alt_recipient = dict(recipient.items()) 

299 alt_recipient[CONF_MOBILE_DEVICES] = alt_mobiles 

300 to_remove.append(recipient) 

301 to_add.append(alt_recipient) 

302 if to_add or to_remove: 

303 recipients = update_dict_list(recipients, to_add, to_remove) 

304 return recipients