Coverage for custom_components/supernotify/snoozer.py: 86%
182 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
1import logging
2import time
3from typing import Any
5from homeassistant.core import Event
7from custom_components.supernotify.common import format_timestamp, update_dict_list
9from . import (
10 ATTR_ACTION,
11 ATTR_USER_ID,
12 CONF_METHOD,
13 CONF_MOBILE_DEVICES,
14 CONF_NOTIFY_ACTION,
15 CONF_PERSON,
16 PRIORITY_CRITICAL,
17 PRIORITY_MEDIUM,
18 CommandType,
19 GlobalTargetType,
20 QualifiedTargetType,
21 RecipientType,
22 TargetType,
23)
25SNOOZE_TIME = 60 * 60 # TODO: move to configuration
26_LOGGER = logging.getLogger(__name__)
29class Snooze:
30 target: str | None
31 target_type: TargetType
32 snoozed_at: float
33 snooze_until: float | None = None
34 recipient_type: RecipientType
35 recipient: str | None
36 reason: str | None = None
38 def __init__(
39 self,
40 target_type: TargetType,
41 recipient_type: RecipientType,
42 target: str | None = None,
43 recipient: str | None = None,
44 snooze_for: int | None = None,
45 reason: str | None = None,
46 ) -> None:
47 self.snoozed_at = time.time()
48 self.target = target
49 self.target_type = target_type
50 self.recipient_type = recipient_type
51 self.recipient = recipient
52 self.reason = reason
53 if snooze_for:
54 self.snooze_until = self.snoozed_at + snooze_for
56 def std_recipient(self) -> str | None:
57 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE
59 def short_key(self) -> str:
60 # only one GLOBAL can be active at a time
61 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}"
62 return f"{target}_{self.std_recipient()}"
64 def __eq__(self, other: object) -> bool:
65 """Check if two snoozes for the same thing"""
66 if not isinstance(other, Snooze):
67 return False
68 return self.short_key() == other.short_key()
70 def __repr__(self) -> str:
71 """Return a string representation of the object."""
72 return f"Snooze({self.target_type}, {self.target}, {self.std_recipient()})"
74 def active(self) -> bool:
75 return self.snooze_until is None or self.snooze_until > time.time()
77 def export(self) -> dict:
78 return {
79 "target_type": self.target_type,
80 "target": self.target,
81 "recipient_type": self.recipient_type,
82 "recipient": self.recipient,
83 "reason": self.reason,
84 "snoozed_at": format_timestamp(self.snoozed_at),
85 "snooze_until": format_timestamp(self.snooze_until),
86 }
89class Snoozer:
90 """Manage snoozing"""
92 def __init__(self) -> None:
93 self.snoozes: dict[str, Snooze] = {}
95 def handle_command_event(self, event: Event, people: dict[str, Any] | None = None) -> None:
96 people = people or {}
97 try:
98 cmd: CommandType
99 target_type: TargetType | None = None
100 target: str | None = None
101 snooze_for: int = SNOOZE_TIME
102 recipient_type: RecipientType | None = None
103 event_name = event.data.get(ATTR_ACTION)
105 if not event_name:
106 _LOGGER.warning(
107 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s",
108 event.origin,
109 event.time_fired,
110 event.data,
111 event.context,
112 )
113 return
115 _LOGGER.debug(
116 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context
117 )
118 event_parts: list[str] = event_name.split("_")
119 if len(event_parts) < 4:
120 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name)
121 return
122 cmd = CommandType[event_parts[1]]
123 recipient_type = RecipientType[event_parts[2]]
124 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4:
125 target_type = QualifiedTargetType[event_parts[3]]
126 target = event_parts[4]
127 snooze_for = int(event_parts[-1]) if len(event_parts) == 6 else SNOOZE_TIME
128 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4:
129 target_type = GlobalTargetType[event_parts[3]]
130 snooze_for = int(event_parts[-1]) if len(event_parts) == 5 else SNOOZE_TIME
132 if cmd is None or target_type is None or recipient_type is None:
133 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name)
134 return
136 except KeyError as ke:
137 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke)
138 return
139 except Exception as e:
140 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e)
141 return
143 try:
144 recipient: str | None = None
145 if recipient_type == RecipientType.USER:
146 target_people = [
147 p.get(CONF_PERSON)
148 for p in people.values()
149 if p.get(ATTR_USER_ID) == event.context.user_id and event.context.user_id is not None and p.get(CONF_PERSON)
150 ]
151 if target_people:
152 recipient = target_people[0]
153 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient)
154 else:
155 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id)
156 return
158 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for)
160 except Exception as e:
161 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e)
163 def register_snooze(
164 self,
165 cmd: CommandType,
166 target_type: TargetType,
167 target: str | None,
168 recipient_type: RecipientType,
169 recipient: str | None,
170 snooze_for: int | None,
171 reason: str = "User command",
172 ) -> None:
173 if cmd == CommandType.SNOOZE:
174 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason)
175 self.snoozes[snooze.short_key()] = snooze
176 elif cmd == CommandType.SILENCE:
177 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason)
178 self.snoozes[snooze.short_key()] = snooze
179 elif cmd == CommandType.NORMAL:
180 anti_snooze = Snooze(target_type, recipient_type, target, recipient)
181 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()]
182 for k in to_del:
183 del self.snoozes[k]
184 else:
185 _LOGGER.warning(
186 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)",
187 cmd,
188 target_type,
189 target,
190 recipient_type,
191 )
193 def purge_snoozes(self) -> None:
194 to_del = [k for k, v in self.snoozes.items() if not v.active()]
195 for k in to_del:
196 del self.snoozes[k]
198 def clear(self) -> int:
199 cleared = len(self.snoozes)
200 self.snoozes.clear()
201 return cleared
203 def export(self) -> list[dict]:
204 return [s.export() for s in self.snoozes.values()]
206 def current_snoozes(
207 self,
208 priority: str = PRIORITY_MEDIUM,
209 delivery_names: list | None = None,
210 delivery_definitions: dict[str, dict] | None = None,
211 ) -> list[Snooze]:
212 delivery_names = delivery_names or []
213 delivery_definitions = delivery_definitions or {}
214 inscope_snoozes: list[Snooze] = []
216 for snooze in self.snoozes.values():
217 if snooze.active():
218 match snooze.target_type:
219 case GlobalTargetType.EVERYTHING:
220 inscope_snoozes.append(snooze)
221 case GlobalTargetType.NONCRITICAL:
222 if priority != PRIORITY_CRITICAL:
223 inscope_snoozes.append(snooze)
224 case QualifiedTargetType.DELIVERY:
225 if snooze.target in delivery_names:
226 inscope_snoozes.append(snooze)
227 case QualifiedTargetType.PRIORITY:
228 if snooze.target == priority:
229 inscope_snoozes.append(snooze)
230 case QualifiedTargetType.ACTION:
231 inscope_snoozes.append(snooze)
232 case QualifiedTargetType.METHOD:
233 if snooze.target in [delivery_definitions.get(d, {}).get(CONF_METHOD) for d in delivery_names]:
234 inscope_snoozes.append(snooze)
235 case QualifiedTargetType.CAMERA:
236 inscope_snoozes.append(snooze)
237 case _:
238 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type)
240 return inscope_snoozes
242 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool:
243 for snooze in self.snoozes.values():
244 if snooze.active():
245 match snooze.target_type:
246 case GlobalTargetType.EVERYTHING:
247 return True
248 case GlobalTargetType.NONCRITICAL:
249 if priority != PRIORITY_CRITICAL:
250 return True
252 return False
254 def filter_recipients(
255 self,
256 recipients: list[dict],
257 priority: str,
258 delivery_name: str,
259 delivery_method: "DeliveryMethod", # type: ignore # noqa: F821
260 all_delivery_names: list[str],
261 delivery_definitions: dict[str, Any],
262 ) -> list[dict]:
263 inscope_snoozes = self.current_snoozes(priority, all_delivery_names, delivery_definitions)
264 for snooze in inscope_snoozes:
265 if snooze.recipient_type == RecipientType.USER:
266 # assume the everyone checks are made before notification gets this far
267 if (
268 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery_name)
269 or (snooze.target_type == QualifiedTargetType.METHOD and snooze.target == delivery_method.method)
270 or (
271 snooze.target_type == QualifiedTargetType.PRIORITY
272 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target))
273 )
274 or snooze.target_type == GlobalTargetType.EVERYTHING
275 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL)
276 ):
277 recipients_to_remove = []
278 for recipient in recipients:
279 if recipient.get(CONF_PERSON) == snooze.recipient:
280 recipients_to_remove.append(recipient)
281 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient)
282 for r in recipients_to_remove:
283 recipients.remove(r)
285 if snooze.target_type == QualifiedTargetType.ACTION:
286 to_remove: list[dict] = []
287 to_add: list[dict] = []
288 for recipient in recipients:
289 if recipient.get(CONF_PERSON) == snooze.recipient:
290 alt_mobiles: list[dict] = list(recipient.get(CONF_MOBILE_DEVICES, []))
291 md_to_remove = []
292 for md in alt_mobiles:
293 if md.get(CONF_NOTIFY_ACTION) == snooze.target:
294 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target)
295 md_to_remove.append(md)
296 for md in md_to_remove:
297 alt_mobiles.remove(md)
298 alt_recipient = dict(recipient.items())
299 alt_recipient[CONF_MOBILE_DEVICES] = alt_mobiles
300 to_remove.append(recipient)
301 to_add.append(alt_recipient)
302 if to_add or to_remove:
303 recipients = update_dict_list(recipients, to_add, to_remove)
304 return recipients