Coverage for custom_components/supernotify/media_grab.py: 83%
145 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-26 08:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-26 08:54 +0000
1import asyncio
2import io
3import logging
4import time
5from http import HTTPStatus
6from io import BytesIO
7from pathlib import Path
8from typing import TYPE_CHECKING, Any
10import aiofiles
11import anyio
12from aiohttp import ClientTimeout
13from homeassistant.const import STATE_HOME
14from homeassistant.core import HomeAssistant
15from homeassistant.helpers.aiohttp_client import async_get_clientsession
16from PIL import Image
18from custom_components.supernotify import (
19 CONF_ALT_CAMERA,
20 CONF_CAMERA,
21 CONF_DEVICE_TRACKER,
22 PTZ_METHOD_FRIGATE,
23 PTZ_METHOD_ONVIF,
24)
25from custom_components.supernotify.configuration import Context
27if TYPE_CHECKING:
28 from homeassistant.components.image import ImageEntity
30_LOGGER = logging.getLogger(__name__)
33async def snapshot_from_url(
34 hass: HomeAssistant,
35 snapshot_url: str,
36 notification_id: str,
37 media_path: Path,
38 hass_base_url: str | None,
39 remote_timeout: int = 15,
40 jpeg_opts: dict[str, Any] | None = None,
41) -> Path | None:
42 hass_base_url = hass_base_url or ""
43 try:
44 media_dir: anyio.Path = anyio.Path(media_path) / "snapshot"
45 await media_dir.mkdir(parents=True, exist_ok=True)
47 if snapshot_url.startswith("http"):
48 image_url = snapshot_url
49 else:
50 image_url = f"{hass_base_url}{snapshot_url}"
51 websession = async_get_clientsession(hass)
52 r = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout))
53 if r.status != HTTPStatus.OK:
54 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status)
55 else:
56 if r.content_type in ("image/jpeg", "image/jpg"):
57 media_ext = "jpg"
58 image_format = "JPEG"
59 elif r.content_type == "image/png":
60 media_ext = "png"
61 image_format = "PNG"
62 elif r.content_type == "image/gif":
63 media_ext = "gif"
64 image_format = "GIF"
65 else:
66 _LOGGER.info("SUPERNOTIFY Unexpected MIME type %s from snap of %s", r.content_type, image_url)
67 media_ext = "img"
68 image_format = None
70 # TODO: configure image rewrite
71 image_path: Path = Path(media_dir) / f"{notification_id}.{media_ext}"
72 image: Image.Image = Image.open(io.BytesIO(await r.content.read()))
73 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage
74 clean_image: Image.Image = Image.new(image.mode, image.size)
75 clean_image.putdata(image.getdata())
76 buffer = BytesIO()
77 img_args = {}
78 if image_format == "JPEG" and jpeg_opts:
79 img_args.update(jpeg_opts)
80 clean_image.save(buffer, image_format, **img_args)
81 async with aiofiles.open(image_path, "wb") as file:
82 await file.write(buffer.getbuffer())
83 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path)
84 return image_path
85 except Exception as e:
86 _LOGGER.error("SUPERNOTIFY Image snap fail: %s", e)
87 return None
90async def move_camera_to_ptz_preset(
91 hass: HomeAssistant, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF
92) -> None:
93 try:
94 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id)
95 if method == PTZ_METHOD_FRIGATE:
96 await hass.services.async_call(
97 "frigate",
98 "ptz",
99 service_data={"action": "preset", "argument": preset},
100 target={
101 "entity_id": camera_entity_id,
102 },
103 )
104 elif method == PTZ_METHOD_ONVIF:
105 await hass.services.async_call(
106 "onvif",
107 "ptz",
108 service_data={"move_mode": "GotoPreset", "preset": preset},
109 target={
110 "entity_id": camera_entity_id,
111 },
112 )
113 else:
114 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method)
115 except Exception as e:
116 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e)
119async def snap_image(
120 context: Context,
121 entity_id: str,
122 media_path: Path,
123 notification_id: str,
124 jpeg_opts: dict[str, Any] | None = None,
125) -> Path | None:
126 """Use for any image, including MQTT Image"""
127 image_path: anyio.Path | None = None
128 try:
129 image_entity: ImageEntity | None = None
130 if context.hass:
131 image_entity = context.hass.data["image"].get_entity(entity_id)
132 if image_entity:
133 bitmap: bytes | None = await image_entity.async_image()
134 if bitmap is None:
135 _LOGGER.warning("SUPERNOTIFY Empty bitmap from image entity %s", entity_id)
136 else:
137 image: Image.Image = Image.open(io.BytesIO(bitmap))
138 media_dir: anyio.Path = anyio.Path(media_path) / "image"
139 await media_dir.mkdir(parents=True, exist_ok=True)
141 media_ext: str = image.format.lower() if image.format else "img"
142 timed: str = str(time.time()).replace(".", "_")
143 image_path = anyio.Path(media_dir) / f"{notification_id}_{timed}.{media_ext}"
144 buffer = BytesIO()
145 img_args = {}
146 if media_ext in ("jpg", "jpeg") and jpeg_opts:
147 img_args.update(jpeg_opts)
148 image.save(buffer, image.format, **img_args)
149 async with aiofiles.open(await image_path.resolve(), "wb") as file:
150 await file.write(buffer.getbuffer())
151 else:
152 _LOGGER.warning("SUPERNOTIFY Unable to find image entity %s", entity_id)
153 except Exception as e:
154 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e)
155 return None
156 return Path(await image_path.resolve()) if image_path else None
159async def snap_camera(
160 hass: HomeAssistant,
161 camera_entity_id: str,
162 media_path: Path,
163 max_camera_wait: int = 20,
164 jpeg_opts: dict[str, Any] | None = None,
165) -> Path | None:
166 image_path: Path | None = None
167 if not camera_entity_id:
168 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap")
169 return image_path
170 if jpeg_opts:
171 _LOGGER.warning("jpeg_opts not yet supported by snap_camera")
173 try:
174 media_dir: anyio.Path = anyio.Path(media_path) / "camera"
175 await media_dir.mkdir(parents=True, exist_ok=True)
176 timed = str(time.time()).replace(".", "_")
177 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg"
178 await hass.services.async_call(
179 "camera", "snapshot", service_data={"entity_id": camera_entity_id, "filename": image_path}
180 )
182 # give async service time
183 cutoff_time = time.time() + max_camera_wait
184 while time.time() < cutoff_time and not image_path.exists():
185 _LOGGER.info("Image file not available yet at %s, pausing", image_path)
186 await asyncio.sleep(1)
188 except Exception as e:
189 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e)
190 image_path = None
192 return image_path
195def select_avail_camera(hass: HomeAssistant, cameras: dict[str, Any], camera_entity_id: str) -> str | None:
196 avail_camera_entity_id: str | None = None
198 try:
199 preferred_cam = cameras.get(camera_entity_id)
201 if not preferred_cam or not preferred_cam.get(CONF_DEVICE_TRACKER):
202 # assume unconfigured camera, or configured without tracker, available
203 avail_camera_entity_id = camera_entity_id
204 elif hass.states.is_state(preferred_cam[CONF_DEVICE_TRACKER], STATE_HOME):
205 avail_camera_entity_id = camera_entity_id
206 else:
207 alt_cams_with_tracker = [
208 cameras[c]
209 for c in preferred_cam.get(CONF_ALT_CAMERA, [])
210 if c in cameras and cameras[c].get(CONF_DEVICE_TRACKER)
211 ]
212 for alt_cam in alt_cams_with_tracker:
213 tracker_entity_id = alt_cam.get(CONF_DEVICE_TRACKER)
214 if tracker_entity_id and hass.states.is_state(tracker_entity_id, STATE_HOME):
215 avail_camera_entity_id = alt_cam[CONF_CAMERA]
216 _LOGGER.info(
217 "SUPERNOTIFY Selecting available camera %s rather than %s", avail_camera_entity_id, camera_entity_id
218 )
219 break
220 if avail_camera_entity_id is None:
221 alt_cam_ids_without_tracker = [
222 c
223 for c in preferred_cam.get(CONF_ALT_CAMERA, [])
224 if c not in cameras or not cameras[c].get(CONF_DEVICE_TRACKER)
225 ]
226 if len(alt_cam_ids_without_tracker) > 0:
227 _LOGGER.info(
228 "SUPERNOTIFY Selecting untracked camera %s rather than %s", avail_camera_entity_id, camera_entity_id
229 )
230 avail_camera_entity_id = alt_cam_ids_without_tracker[0]
232 if avail_camera_entity_id is None:
233 _LOGGER.warning("%s not available and no alternative available", camera_entity_id)
234 for c in cameras.values():
235 if c.get(CONF_DEVICE_TRACKER):
236 _LOGGER.debug(
237 "SUPERNOTIFY Tracker %s: %s", c.get(CONF_DEVICE_TRACKER), hass.states.get(c[CONF_DEVICE_TRACKER])
238 )
240 except Exception as e:
241 _LOGGER.warning("SUPERNOTIFY Unable to select available camera: %s", e)
243 return avail_camera_entity_id