Coverage for custom_components/supernotify/methods/chime.py: 94%
106 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
1import logging
2import re
3from typing import Any
5from homeassistant.components.group import expand_entity_ids
6from homeassistant.components.notify.const import ATTR_MESSAGE, ATTR_TITLE
7from homeassistant.const import ( # ATTR_VARIABLES from script.const has import issues
8 ATTR_ENTITY_ID,
9 CONF_TARGET,
10 CONF_VARIABLES,
11)
13from custom_components.supernotify import ATTR_DATA, CONF_DATA, CONF_OPTIONS, METHOD_CHIME
14from custom_components.supernotify.common import ensure_list
15from custom_components.supernotify.delivery_method import DeliveryMethod
16from custom_components.supernotify.envelope import Envelope
18RE_VALID_CHIME = r"(switch|script|group|siren|media_player)\.[A-Za-z0-9_]+"
20_LOGGER = logging.getLogger(__name__)
22DATA_SCHEMA_RESTRICT: dict[str, list[str]] = {
23 "media_player": ["data", "entity_id", "media_content_id", "media_content_type", "enqueue", "announce"],
24 "switch": ["entity_id"],
25 "script": ["data", "variables", "context", "wait"],
26 "siren": ["data", "entity_id"],
27} # TODO: source directly from component schema
30class ChimeDeliveryMethod(DeliveryMethod):
31 method = METHOD_CHIME
33 def __init__(self, *args, **kwargs) -> None:
34 super().__init__(*args, **kwargs)
35 self.chime_aliases = self.context.method_defaults.get(self.method, {}).get(CONF_OPTIONS, {}).get("chime_aliases", {})
36 self.chime_entities = self.context.method_defaults.get(self.method, {}).get(CONF_TARGET, [])
38 def validate_action(self, action: str | None) -> bool:
39 return action is None
41 def select_target(self, target: str) -> bool:
42 return re.fullmatch(RE_VALID_CHIME, target) is not None
44 async def deliver(self, envelope: Envelope) -> bool:
45 config = self.delivery_config(envelope.delivery_name)
46 data: dict[str, Any] = {}
47 data.update(config.get(CONF_DATA) or {})
48 data.update(envelope.data or {})
49 targets = envelope.targets or []
51 # chime_repeat = data.pop("chime_repeat", 1)
52 chime_tune: str | None = data.pop("chime_tune", None)
54 _LOGGER.info(
55 "SUPERNOTIFY notify_chime: %s -> %s (delivery: %s, env_data:%s, dlv_data:%s)",
56 chime_tune,
57 targets,
58 envelope.delivery_name,
59 envelope.data,
60 config.get(CONF_DATA),
61 )
63 expanded_targets = dict.fromkeys(expand_entity_ids(self.hass, targets), chime_tune)
64 entities_and_tunes = self.resolve_tune(chime_tune)
65 expanded_targets.update(entities_and_tunes) # overwrite and extend
66 chimes = 0
67 for chime_entity_id, tune in expanded_targets.items():
68 _LOGGER.debug("SUPERNOTIFY chime %s: %s", chime_entity_id, tune)
69 action_data = None
70 try:
71 domain, service, action_data = self.analyze_target(chime_entity_id, tune, data)
72 if domain is not None and service is not None:
73 action_data = self.prune_data(domain, action_data)
75 if domain == "script":
76 self.set_action_data(action_data[CONF_VARIABLES], ATTR_MESSAGE, envelope.message)
77 self.set_action_data(action_data[CONF_VARIABLES], ATTR_TITLE, envelope.title)
78 self.set_action_data(action_data[CONF_VARIABLES], "chime_tune", tune)
80 if await self.call_action(envelope, f"{domain}.{service}", action_data=action_data):
81 chimes += 1
82 else:
83 _LOGGER.debug("SUPERNOTIFY Chime skipping incomplete service for %s,%s", chime_entity_id, tune)
84 except Exception as e:
85 _LOGGER.error("SUPERNOTIFY Failed to chime %s: %s [%s]", chime_entity_id, action_data, e)
86 return chimes > 0
88 def prune_data(self, domain: str, data: dict) -> dict:
89 pruned: dict[str, Any] = {}
90 if data and domain in DATA_SCHEMA_RESTRICT:
91 restrict: list[str] = DATA_SCHEMA_RESTRICT.get(domain) or []
92 for key in list(data.keys()):
93 if key in restrict:
94 pruned[key] = data[key]
95 return pruned
97 def analyze_target(self, target: str, chime_tune: str | None, data: dict) -> tuple[str, str | None, dict[str, Any]]:
98 if not target:
99 _LOGGER.warning("SUPERNOTIFY Empty chime target")
100 return "", None, {}
101 domain, name = target.split(".", 1)
102 action_data: dict[str, Any] = {}
103 action: str | None = None
104 chime_volume = data.pop("chime_volume", 1)
105 chime_duration = data.pop("chime_duration", 10)
107 if domain == "switch":
108 action = "turn_on"
109 action_data[ATTR_ENTITY_ID] = target
110 elif domain == "siren":
111 action = "turn_on"
112 action_data[ATTR_ENTITY_ID] = target
113 action_data[ATTR_DATA] = {}
114 if chime_tune:
115 action_data[ATTR_DATA]["tone"] = chime_tune
116 action_data[ATTR_DATA]["duration"] = chime_duration
117 action_data[ATTR_DATA]["volume_level"] = chime_volume
119 elif domain == "script":
120 if data:
121 action_data.update(data)
122 action = name
123 action_data.setdefault(CONF_VARIABLES, {})
125 elif domain == "media_player" and chime_tune:
126 if data:
127 action_data.update(data)
128 action = "play_media"
129 action_data[ATTR_ENTITY_ID] = target
130 action_data["media_content_type"] = "sound"
131 action_data["media_content_id"] = chime_tune
133 else:
134 _LOGGER.warning("SUPERNOTIFY No matching chime domain/tune: %s, target: %s, tune: %s", domain, target, chime_tune)
136 return domain, action, action_data
138 def resolve_tune(self, tune: str | None) -> dict[str, Any]:
139 entities_and_tunes: dict[str, Any] = {}
140 if tune is not None:
141 for domain, alias_config in self.chime_aliases.get(tune, {}).items():
142 if isinstance(alias_config, str):
143 alias_config = {"tune": alias_config}
144 domain = alias_config.get("domain", domain)
145 actual_tune = alias_config.get("tune", tune)
146 if ATTR_ENTITY_ID in alias_config:
147 entities_and_tunes.update(dict.fromkeys(ensure_list(alias_config[ATTR_ENTITY_ID]), actual_tune))
148 else:
149 entities_and_tunes.update({ent: actual_tune for ent in self.chime_entities if ent.startswith(f"{domain}.")})
151 return entities_and_tunes