Coverage for custom_components/supernotify/methods/chime.py: 94%

106 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-28 14:21 +0000

1import logging 

2import re 

3from typing import Any 

4 

5from homeassistant.components.group import expand_entity_ids 

6from homeassistant.components.notify.const import ATTR_MESSAGE, ATTR_TITLE 

7from homeassistant.const import ( # ATTR_VARIABLES from script.const has import issues 

8 ATTR_ENTITY_ID, 

9 CONF_TARGET, 

10 CONF_VARIABLES, 

11) 

12 

13from custom_components.supernotify import ATTR_DATA, CONF_DATA, CONF_OPTIONS, METHOD_CHIME 

14from custom_components.supernotify.common import ensure_list 

15from custom_components.supernotify.delivery_method import DeliveryMethod 

16from custom_components.supernotify.envelope import Envelope 

17 

18RE_VALID_CHIME = r"(switch|script|group|siren|media_player)\.[A-Za-z0-9_]+" 

19 

20_LOGGER = logging.getLogger(__name__) 

21 

22DATA_SCHEMA_RESTRICT: dict[str, list[str]] = { 

23 "media_player": ["data", "entity_id", "media_content_id", "media_content_type", "enqueue", "announce"], 

24 "switch": ["entity_id"], 

25 "script": ["data", "variables", "context", "wait"], 

26 "siren": ["data", "entity_id"], 

27} # TODO: source directly from component schema 

28 

29 

30class ChimeDeliveryMethod(DeliveryMethod): 

31 method = METHOD_CHIME 

32 

33 def __init__(self, *args, **kwargs) -> None: 

34 super().__init__(*args, **kwargs) 

35 self.chime_aliases = self.context.method_defaults.get(self.method, {}).get(CONF_OPTIONS, {}).get("chime_aliases", {}) 

36 self.chime_entities = self.context.method_defaults.get(self.method, {}).get(CONF_TARGET, []) 

37 

38 def validate_action(self, action: str | None) -> bool: 

39 return action is None 

40 

41 def select_target(self, target: str) -> bool: 

42 return re.fullmatch(RE_VALID_CHIME, target) is not None 

43 

44 async def deliver(self, envelope: Envelope) -> bool: 

45 config = self.delivery_config(envelope.delivery_name) 

46 data: dict[str, Any] = {} 

47 data.update(config.get(CONF_DATA) or {}) 

48 data.update(envelope.data or {}) 

49 targets = envelope.targets or [] 

50 

51 # chime_repeat = data.pop("chime_repeat", 1) 

52 chime_tune: str | None = data.pop("chime_tune", None) 

53 

54 _LOGGER.info( 

55 "SUPERNOTIFY notify_chime: %s -> %s (delivery: %s, env_data:%s, dlv_data:%s)", 

56 chime_tune, 

57 targets, 

58 envelope.delivery_name, 

59 envelope.data, 

60 config.get(CONF_DATA), 

61 ) 

62 

63 expanded_targets = dict.fromkeys(expand_entity_ids(self.hass, targets), chime_tune) 

64 entities_and_tunes = self.resolve_tune(chime_tune) 

65 expanded_targets.update(entities_and_tunes) # overwrite and extend 

66 chimes = 0 

67 for chime_entity_id, tune in expanded_targets.items(): 

68 _LOGGER.debug("SUPERNOTIFY chime %s: %s", chime_entity_id, tune) 

69 action_data = None 

70 try: 

71 domain, service, action_data = self.analyze_target(chime_entity_id, tune, data) 

72 if domain is not None and service is not None: 

73 action_data = self.prune_data(domain, action_data) 

74 

75 if domain == "script": 

76 self.set_action_data(action_data[CONF_VARIABLES], ATTR_MESSAGE, envelope.message) 

77 self.set_action_data(action_data[CONF_VARIABLES], ATTR_TITLE, envelope.title) 

78 self.set_action_data(action_data[CONF_VARIABLES], "chime_tune", tune) 

79 

80 if await self.call_action(envelope, f"{domain}.{service}", action_data=action_data): 

81 chimes += 1 

82 else: 

83 _LOGGER.debug("SUPERNOTIFY Chime skipping incomplete service for %s,%s", chime_entity_id, tune) 

84 except Exception as e: 

85 _LOGGER.error("SUPERNOTIFY Failed to chime %s: %s [%s]", chime_entity_id, action_data, e) 

86 return chimes > 0 

87 

88 def prune_data(self, domain: str, data: dict) -> dict: 

89 pruned: dict[str, Any] = {} 

90 if data and domain in DATA_SCHEMA_RESTRICT: 

91 restrict: list[str] = DATA_SCHEMA_RESTRICT.get(domain) or [] 

92 for key in list(data.keys()): 

93 if key in restrict: 

94 pruned[key] = data[key] 

95 return pruned 

96 

97 def analyze_target(self, target: str, chime_tune: str | None, data: dict) -> tuple[str, str | None, dict[str, Any]]: 

98 if not target: 

99 _LOGGER.warning("SUPERNOTIFY Empty chime target") 

100 return "", None, {} 

101 domain, name = target.split(".", 1) 

102 action_data: dict[str, Any] = {} 

103 action: str | None = None 

104 chime_volume = data.pop("chime_volume", 1) 

105 chime_duration = data.pop("chime_duration", 10) 

106 

107 if domain == "switch": 

108 action = "turn_on" 

109 action_data[ATTR_ENTITY_ID] = target 

110 elif domain == "siren": 

111 action = "turn_on" 

112 action_data[ATTR_ENTITY_ID] = target 

113 action_data[ATTR_DATA] = {} 

114 if chime_tune: 

115 action_data[ATTR_DATA]["tone"] = chime_tune 

116 action_data[ATTR_DATA]["duration"] = chime_duration 

117 action_data[ATTR_DATA]["volume_level"] = chime_volume 

118 

119 elif domain == "script": 

120 if data: 

121 action_data.update(data) 

122 action = name 

123 action_data.setdefault(CONF_VARIABLES, {}) 

124 

125 elif domain == "media_player" and chime_tune: 

126 if data: 

127 action_data.update(data) 

128 action = "play_media" 

129 action_data[ATTR_ENTITY_ID] = target 

130 action_data["media_content_type"] = "sound" 

131 action_data["media_content_id"] = chime_tune 

132 

133 else: 

134 _LOGGER.warning("SUPERNOTIFY No matching chime domain/tune: %s, target: %s, tune: %s", domain, target, chime_tune) 

135 

136 return domain, action, action_data 

137 

138 def resolve_tune(self, tune: str | None) -> dict[str, Any]: 

139 entities_and_tunes: dict[str, Any] = {} 

140 if tune is not None: 

141 for domain, alias_config in self.chime_aliases.get(tune, {}).items(): 

142 if isinstance(alias_config, str): 

143 alias_config = {"tune": alias_config} 

144 domain = alias_config.get("domain", domain) 

145 actual_tune = alias_config.get("tune", tune) 

146 if ATTR_ENTITY_ID in alias_config: 

147 entities_and_tunes.update(dict.fromkeys(ensure_list(alias_config[ATTR_ENTITY_ID]), actual_tune)) 

148 else: 

149 entities_and_tunes.update({ent: actual_tune for ent in self.chime_entities if ent.startswith(f"{domain}.")}) 

150 

151 return entities_and_tunes