Coverage for custom_components/supernotify/media_grab.py: 83%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-26 08:54 +0000

1import asyncio 

2import io 

3import logging 

4import time 

5from http import HTTPStatus 

6from io import BytesIO 

7from pathlib import Path 

8from typing import TYPE_CHECKING, Any 

9 

10import aiofiles 

11import anyio 

12from aiohttp import ClientTimeout 

13from homeassistant.const import STATE_HOME 

14from homeassistant.core import HomeAssistant 

15from homeassistant.helpers.aiohttp_client import async_get_clientsession 

16from PIL import Image 

17 

18from custom_components.supernotify import ( 

19 CONF_ALT_CAMERA, 

20 CONF_CAMERA, 

21 CONF_DEVICE_TRACKER, 

22 PTZ_METHOD_FRIGATE, 

23 PTZ_METHOD_ONVIF, 

24) 

25from custom_components.supernotify.configuration import Context 

26 

27if TYPE_CHECKING: 

28 from homeassistant.components.image import ImageEntity 

29 

30_LOGGER = logging.getLogger(__name__) 

31 

32 

33async def snapshot_from_url( 

34 hass: HomeAssistant, 

35 snapshot_url: str, 

36 notification_id: str, 

37 media_path: Path, 

38 hass_base_url: str | None, 

39 remote_timeout: int = 15, 

40 jpeg_opts: dict[str, Any] | None = None, 

41) -> Path | None: 

42 hass_base_url = hass_base_url or "" 

43 try: 

44 media_dir: anyio.Path = anyio.Path(media_path) / "snapshot" 

45 await media_dir.mkdir(parents=True, exist_ok=True) 

46 

47 if snapshot_url.startswith("http"): 

48 image_url = snapshot_url 

49 else: 

50 image_url = f"{hass_base_url}{snapshot_url}" 

51 websession = async_get_clientsession(hass) 

52 r = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout)) 

53 if r.status != HTTPStatus.OK: 

54 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status) 

55 else: 

56 if r.content_type in ("image/jpeg", "image/jpg"): 

57 media_ext = "jpg" 

58 image_format = "JPEG" 

59 elif r.content_type == "image/png": 

60 media_ext = "png" 

61 image_format = "PNG" 

62 elif r.content_type == "image/gif": 

63 media_ext = "gif" 

64 image_format = "GIF" 

65 else: 

66 _LOGGER.info("SUPERNOTIFY Unexpected MIME type %s from snap of %s", r.content_type, image_url) 

67 media_ext = "img" 

68 image_format = None 

69 

70 # TODO: configure image rewrite 

71 image_path: Path = Path(media_dir) / f"{notification_id}.{media_ext}" 

72 image: Image.Image = Image.open(io.BytesIO(await r.content.read())) 

73 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage 

74 clean_image: Image.Image = Image.new(image.mode, image.size) 

75 clean_image.putdata(image.getdata()) 

76 buffer = BytesIO() 

77 img_args = {} 

78 if image_format == "JPEG" and jpeg_opts: 

79 img_args.update(jpeg_opts) 

80 clean_image.save(buffer, image_format, **img_args) 

81 async with aiofiles.open(image_path, "wb") as file: 

82 await file.write(buffer.getbuffer()) 

83 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path) 

84 return image_path 

85 except Exception as e: 

86 _LOGGER.error("SUPERNOTIFY Image snap fail: %s", e) 

87 return None 

88 

89 

90async def move_camera_to_ptz_preset( 

91 hass: HomeAssistant, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF 

92) -> None: 

93 try: 

94 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id) 

95 if method == PTZ_METHOD_FRIGATE: 

96 await hass.services.async_call( 

97 "frigate", 

98 "ptz", 

99 service_data={"action": "preset", "argument": preset}, 

100 target={ 

101 "entity_id": camera_entity_id, 

102 }, 

103 ) 

104 elif method == PTZ_METHOD_ONVIF: 

105 await hass.services.async_call( 

106 "onvif", 

107 "ptz", 

108 service_data={"move_mode": "GotoPreset", "preset": preset}, 

109 target={ 

110 "entity_id": camera_entity_id, 

111 }, 

112 ) 

113 else: 

114 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method) 

115 except Exception as e: 

116 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e) 

117 

118 

119async def snap_image( 

120 context: Context, 

121 entity_id: str, 

122 media_path: Path, 

123 notification_id: str, 

124 jpeg_opts: dict[str, Any] | None = None, 

125) -> Path | None: 

126 """Use for any image, including MQTT Image""" 

127 image_path: anyio.Path | None = None 

128 try: 

129 image_entity: ImageEntity | None = None 

130 if context.hass: 

131 image_entity = context.hass.data["image"].get_entity(entity_id) 

132 if image_entity: 

133 bitmap: bytes | None = await image_entity.async_image() 

134 if bitmap is None: 

135 _LOGGER.warning("SUPERNOTIFY Empty bitmap from image entity %s", entity_id) 

136 else: 

137 image: Image.Image = Image.open(io.BytesIO(bitmap)) 

138 media_dir: anyio.Path = anyio.Path(media_path) / "image" 

139 await media_dir.mkdir(parents=True, exist_ok=True) 

140 

141 media_ext: str = image.format.lower() if image.format else "img" 

142 timed: str = str(time.time()).replace(".", "_") 

143 image_path = anyio.Path(media_dir) / f"{notification_id}_{timed}.{media_ext}" 

144 buffer = BytesIO() 

145 img_args = {} 

146 if media_ext in ("jpg", "jpeg") and jpeg_opts: 

147 img_args.update(jpeg_opts) 

148 image.save(buffer, image.format, **img_args) 

149 async with aiofiles.open(await image_path.resolve(), "wb") as file: 

150 await file.write(buffer.getbuffer()) 

151 else: 

152 _LOGGER.warning("SUPERNOTIFY Unable to find image entity %s", entity_id) 

153 except Exception as e: 

154 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e) 

155 return None 

156 return Path(await image_path.resolve()) if image_path else None 

157 

158 

159async def snap_camera( 

160 hass: HomeAssistant, 

161 camera_entity_id: str, 

162 media_path: Path, 

163 max_camera_wait: int = 20, 

164 jpeg_opts: dict[str, Any] | None = None, 

165) -> Path | None: 

166 image_path: Path | None = None 

167 if not camera_entity_id: 

168 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap") 

169 return image_path 

170 if jpeg_opts: 

171 _LOGGER.warning("jpeg_opts not yet supported by snap_camera") 

172 

173 try: 

174 media_dir: anyio.Path = anyio.Path(media_path) / "camera" 

175 await media_dir.mkdir(parents=True, exist_ok=True) 

176 timed = str(time.time()).replace(".", "_") 

177 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg" 

178 await hass.services.async_call( 

179 "camera", "snapshot", service_data={"entity_id": camera_entity_id, "filename": image_path} 

180 ) 

181 

182 # give async service time 

183 cutoff_time = time.time() + max_camera_wait 

184 while time.time() < cutoff_time and not image_path.exists(): 

185 _LOGGER.info("Image file not available yet at %s, pausing", image_path) 

186 await asyncio.sleep(1) 

187 

188 except Exception as e: 

189 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e) 

190 image_path = None 

191 

192 return image_path 

193 

194 

195def select_avail_camera(hass: HomeAssistant, cameras: dict[str, Any], camera_entity_id: str) -> str | None: 

196 avail_camera_entity_id: str | None = None 

197 

198 try: 

199 preferred_cam = cameras.get(camera_entity_id) 

200 

201 if not preferred_cam or not preferred_cam.get(CONF_DEVICE_TRACKER): 

202 # assume unconfigured camera, or configured without tracker, available 

203 avail_camera_entity_id = camera_entity_id 

204 elif hass.states.is_state(preferred_cam[CONF_DEVICE_TRACKER], STATE_HOME): 

205 avail_camera_entity_id = camera_entity_id 

206 else: 

207 alt_cams_with_tracker = [ 

208 cameras[c] 

209 for c in preferred_cam.get(CONF_ALT_CAMERA, []) 

210 if c in cameras and cameras[c].get(CONF_DEVICE_TRACKER) 

211 ] 

212 for alt_cam in alt_cams_with_tracker: 

213 tracker_entity_id = alt_cam.get(CONF_DEVICE_TRACKER) 

214 if tracker_entity_id and hass.states.is_state(tracker_entity_id, STATE_HOME): 

215 avail_camera_entity_id = alt_cam[CONF_CAMERA] 

216 _LOGGER.info( 

217 "SUPERNOTIFY Selecting available camera %s rather than %s", avail_camera_entity_id, camera_entity_id 

218 ) 

219 break 

220 if avail_camera_entity_id is None: 

221 alt_cam_ids_without_tracker = [ 

222 c 

223 for c in preferred_cam.get(CONF_ALT_CAMERA, []) 

224 if c not in cameras or not cameras[c].get(CONF_DEVICE_TRACKER) 

225 ] 

226 if len(alt_cam_ids_without_tracker) > 0: 

227 _LOGGER.info( 

228 "SUPERNOTIFY Selecting untracked camera %s rather than %s", avail_camera_entity_id, camera_entity_id 

229 ) 

230 avail_camera_entity_id = alt_cam_ids_without_tracker[0] 

231 

232 if avail_camera_entity_id is None: 

233 _LOGGER.warning("%s not available and no alternative available", camera_entity_id) 

234 for c in cameras.values(): 

235 if c.get(CONF_DEVICE_TRACKER): 

236 _LOGGER.debug( 

237 "SUPERNOTIFY Tracker %s: %s", c.get(CONF_DEVICE_TRACKER), hass.states.get(c[CONF_DEVICE_TRACKER]) 

238 ) 

239 

240 except Exception as e: 

241 _LOGGER.warning("SUPERNOTIFY Unable to select available camera: %s", e) 

242 

243 return avail_camera_entity_id