Coverage for custom_components/supernotify/media_grab.py: 82%

146 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-28 14:21 +0000

1import asyncio 

2import io 

3import logging 

4import time 

5from http import HTTPStatus 

6from io import BytesIO 

7from pathlib import Path 

8from typing import TYPE_CHECKING 

9 

10import aiofiles 

11from aiohttp import ClientTimeout 

12from homeassistant.const import STATE_HOME 

13from homeassistant.core import HomeAssistant 

14from homeassistant.helpers.aiohttp_client import async_get_clientsession 

15from PIL import Image 

16 

17from custom_components.supernotify import ( 

18 CONF_ALT_CAMERA, 

19 CONF_CAMERA, 

20 CONF_DEVICE_TRACKER, 

21 PTZ_METHOD_FRIGATE, 

22 PTZ_METHOD_ONVIF, 

23) 

24from custom_components.supernotify.configuration import SupernotificationConfiguration 

25 

26if TYPE_CHECKING: 

27 from homeassistant.components.image import ImageEntity 

28 

29_LOGGER = logging.getLogger(__name__) 

30 

31 

32async def snapshot_from_url( 

33 hass: HomeAssistant, 

34 snapshot_url: str, 

35 notification_id: str, 

36 media_path: Path, 

37 hass_base_url: str | None, 

38 remote_timeout: int = 15, 

39 jpeg_args: dict | None = None, 

40) -> Path | None: 

41 hass_base_url = hass_base_url or "" 

42 try: 

43 media_dir: Path = Path(media_path) / "snapshot" 

44 media_dir.mkdir(parents=True, exist_ok=True) 

45 

46 if snapshot_url.startswith("http"): 

47 image_url = snapshot_url 

48 else: 

49 image_url = f"{hass_base_url}{snapshot_url}" 

50 websession = async_get_clientsession(hass) 

51 r = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout)) 

52 if r.status != HTTPStatus.OK: 

53 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status) 

54 else: 

55 if r.content_type == "image/jpeg": 

56 media_ext = "jpg" 

57 image_format = "JPEG" 

58 elif r.content_type == "image/png": 

59 media_ext = "png" 

60 image_format = "PNG" 

61 elif r.content_type == "image/gif": 

62 media_ext = "gif" 

63 image_format = "GIF" 

64 else: 

65 _LOGGER.info("SUPERNOTIFY Unexpected MIME type %s from snap of %s", r.content_type, image_url) 

66 media_ext = "img" 

67 image_format = None 

68 

69 # TODO: configure image rewrite 

70 image_path: Path = Path(media_dir) / f"{notification_id}.{media_ext}" 

71 image: Image.Image = Image.open(io.BytesIO(await r.content.read())) 

72 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage 

73 clean_image: Image.Image = Image.new(image.mode, image.size) 

74 clean_image.putdata(image.getdata()) # type: ignore 

75 buffer = BytesIO() 

76 img_args = {} 

77 if image_format == "JPEG" and jpeg_args: 

78 img_args.update(jpeg_args) 

79 clean_image.save(buffer, image_format, **img_args) 

80 async with aiofiles.open(image_path, "wb") as file: 

81 await file.write(buffer.getbuffer()) 

82 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path) 

83 return image_path 

84 except Exception as e: 

85 _LOGGER.error("SUPERNOTIFY Image snap fail: %s", e) 

86 return None 

87 

88 

89async def move_camera_to_ptz_preset( 

90 hass: HomeAssistant, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF 

91) -> None: 

92 try: 

93 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id) 

94 if method == PTZ_METHOD_FRIGATE: 

95 await hass.services.async_call( 

96 "frigate", 

97 "ptz", 

98 service_data={"action": "preset", "argument": preset}, 

99 target={ 

100 "entity_id": camera_entity_id, 

101 }, 

102 ) 

103 elif method == PTZ_METHOD_ONVIF: 

104 await hass.services.async_call( 

105 "onvif", 

106 "ptz", 

107 service_data={"move_mode": "GotoPreset", "preset": preset}, 

108 target={ 

109 "entity_id": camera_entity_id, 

110 }, 

111 ) 

112 else: 

113 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method) 

114 except Exception as e: 

115 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e) 

116 

117 

118async def snap_image( 

119 context: SupernotificationConfiguration, 

120 entity_id: str, 

121 media_path: Path, 

122 notification_id: str, 

123 jpeg_args: dict | None = None, 

124) -> Path | None: 

125 """Use for any image, including MQTT Image""" 

126 image_path: Path | None = None 

127 try: 

128 image_entity: ImageEntity | None = None 

129 if context.hass: 

130 image_entity = context.hass.data["image"].get_entity(entity_id) 

131 if image_entity: 

132 bitmap: bytes | None = await image_entity.async_image() 

133 if bitmap is None: 

134 _LOGGER.warning("SUPERNOTIFY Empty bitmap from image entity %s", entity_id) 

135 else: 

136 image: Image.Image = Image.open(io.BytesIO(bitmap)) 

137 media_dir: Path = media_path / "image" 

138 media_dir.mkdir(parents=True, exist_ok=True) 

139 

140 media_ext: str = image.format.lower() if image.format else "img" 

141 timed: str = str(time.time()).replace(".", "_") 

142 image_path = Path(media_dir) / f"{notification_id}_{timed}.{media_ext}" 

143 buffer = BytesIO() 

144 img_args = {} 

145 if media_ext in ("jpg", "jpeg") and jpeg_args: 

146 img_args.update(jpeg_args) 

147 image.save(buffer, image.format, **img_args) 

148 async with aiofiles.open(image_path, "wb") as file: 

149 await file.write(buffer.getbuffer()) 

150 else: 

151 _LOGGER.warning("SUPERNOTIFY Unable to find image entity %s", entity_id) 

152 except Exception as e: 

153 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e) 

154 return None 

155 return image_path 

156 

157 

158async def snap_camera( 

159 hass: HomeAssistant, camera_entity_id: str, media_path: Path, max_camera_wait: int = 20, jpeg_args: dict | None = None 

160) -> Path | None: 

161 image_path: Path | None = None 

162 if not camera_entity_id: 

163 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap") 

164 return image_path 

165 if jpeg_args: 

166 _LOGGER.warning("jpeg_args not yet supported by snap_camera") 

167 

168 try: 

169 media_dir: Path = media_path / "camera" 

170 media_dir.mkdir(parents=True, exist_ok=True) 

171 timed = str(time.time()).replace(".", "_") 

172 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg" 

173 await hass.services.async_call( 

174 "camera", "snapshot", service_data={"entity_id": camera_entity_id, "filename": image_path} 

175 ) 

176 

177 # give async service time 

178 cutoff_time = time.time() + max_camera_wait 

179 while time.time() < cutoff_time and not image_path.exists(): 

180 _LOGGER.info("Image file not available yet at %s, pausing", image_path) 

181 await asyncio.sleep(1) 

182 

183 except Exception as e: 

184 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e) 

185 image_path = None 

186 

187 return image_path 

188 

189 

190async def select_avail_camera(hass: HomeAssistant, cameras: dict[str, dict], camera_entity_id: str) -> str | None: 

191 avail_camera_entity_id: str | None = None 

192 

193 try: 

194 preferred_cam = cameras.get(camera_entity_id) 

195 

196 if not preferred_cam or not preferred_cam.get(CONF_DEVICE_TRACKER): 

197 # assume unconfigured camera, or configured without tracker, available 

198 avail_camera_entity_id = camera_entity_id 

199 elif hass.states.is_state(preferred_cam[CONF_DEVICE_TRACKER], STATE_HOME): 

200 avail_camera_entity_id = camera_entity_id 

201 else: 

202 alt_cams_with_tracker = [ 

203 cameras[c] 

204 for c in preferred_cam.get(CONF_ALT_CAMERA, []) 

205 if c in cameras and cameras[c].get(CONF_DEVICE_TRACKER) 

206 ] 

207 for alt_cam in alt_cams_with_tracker: 

208 tracker_entity_id = alt_cam.get(CONF_DEVICE_TRACKER) 

209 if tracker_entity_id and hass.states.is_state(tracker_entity_id, STATE_HOME): 

210 avail_camera_entity_id = alt_cam[CONF_CAMERA] 

211 _LOGGER.info( 

212 "SUPERNOTIFY Selecting available camera %s rather than %s", avail_camera_entity_id, camera_entity_id 

213 ) 

214 break 

215 if avail_camera_entity_id is None: 

216 alt_cam_ids_without_tracker = [ 

217 c 

218 for c in preferred_cam.get(CONF_ALT_CAMERA, []) 

219 if c not in cameras or not cameras[c].get(CONF_DEVICE_TRACKER) 

220 ] 

221 if len(alt_cam_ids_without_tracker) > 0: 

222 _LOGGER.info( 

223 "SUPERNOTIFY Selecting untracked camera %s rather than %s", avail_camera_entity_id, camera_entity_id 

224 ) 

225 avail_camera_entity_id = alt_cam_ids_without_tracker[0] 

226 

227 if avail_camera_entity_id is None: 

228 _LOGGER.warning("%s not available and no alternative available", camera_entity_id) 

229 for c in cameras.values(): 

230 if c.get(CONF_DEVICE_TRACKER): 

231 _LOGGER.debug( 

232 "SUPERNOTIFY Tracker %s: %s", c.get(CONF_DEVICE_TRACKER), hass.states.get(c[CONF_DEVICE_TRACKER]) 

233 ) 

234 

235 except Exception as e: 

236 _LOGGER.warning("SUPERNOTIFY Unable to select available camera: %s", e) 

237 

238 return avail_camera_entity_id