Coverage for custom_components/supernotify/media_grab.py: 82%
146 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-28 14:21 +0000
1import asyncio
2import io
3import logging
4import time
5from http import HTTPStatus
6from io import BytesIO
7from pathlib import Path
8from typing import TYPE_CHECKING
10import aiofiles
11from aiohttp import ClientTimeout
12from homeassistant.const import STATE_HOME
13from homeassistant.core import HomeAssistant
14from homeassistant.helpers.aiohttp_client import async_get_clientsession
15from PIL import Image
17from custom_components.supernotify import (
18 CONF_ALT_CAMERA,
19 CONF_CAMERA,
20 CONF_DEVICE_TRACKER,
21 PTZ_METHOD_FRIGATE,
22 PTZ_METHOD_ONVIF,
23)
24from custom_components.supernotify.configuration import SupernotificationConfiguration
26if TYPE_CHECKING:
27 from homeassistant.components.image import ImageEntity
29_LOGGER = logging.getLogger(__name__)
32async def snapshot_from_url(
33 hass: HomeAssistant,
34 snapshot_url: str,
35 notification_id: str,
36 media_path: Path,
37 hass_base_url: str | None,
38 remote_timeout: int = 15,
39 jpeg_args: dict | None = None,
40) -> Path | None:
41 hass_base_url = hass_base_url or ""
42 try:
43 media_dir: Path = Path(media_path) / "snapshot"
44 media_dir.mkdir(parents=True, exist_ok=True)
46 if snapshot_url.startswith("http"):
47 image_url = snapshot_url
48 else:
49 image_url = f"{hass_base_url}{snapshot_url}"
50 websession = async_get_clientsession(hass)
51 r = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout))
52 if r.status != HTTPStatus.OK:
53 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status)
54 else:
55 if r.content_type == "image/jpeg":
56 media_ext = "jpg"
57 image_format = "JPEG"
58 elif r.content_type == "image/png":
59 media_ext = "png"
60 image_format = "PNG"
61 elif r.content_type == "image/gif":
62 media_ext = "gif"
63 image_format = "GIF"
64 else:
65 _LOGGER.info("SUPERNOTIFY Unexpected MIME type %s from snap of %s", r.content_type, image_url)
66 media_ext = "img"
67 image_format = None
69 # TODO: configure image rewrite
70 image_path: Path = Path(media_dir) / f"{notification_id}.{media_ext}"
71 image: Image.Image = Image.open(io.BytesIO(await r.content.read()))
72 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage
73 clean_image: Image.Image = Image.new(image.mode, image.size)
74 clean_image.putdata(image.getdata()) # type: ignore
75 buffer = BytesIO()
76 img_args = {}
77 if image_format == "JPEG" and jpeg_args:
78 img_args.update(jpeg_args)
79 clean_image.save(buffer, image_format, **img_args)
80 async with aiofiles.open(image_path, "wb") as file:
81 await file.write(buffer.getbuffer())
82 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path)
83 return image_path
84 except Exception as e:
85 _LOGGER.error("SUPERNOTIFY Image snap fail: %s", e)
86 return None
89async def move_camera_to_ptz_preset(
90 hass: HomeAssistant, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF
91) -> None:
92 try:
93 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id)
94 if method == PTZ_METHOD_FRIGATE:
95 await hass.services.async_call(
96 "frigate",
97 "ptz",
98 service_data={"action": "preset", "argument": preset},
99 target={
100 "entity_id": camera_entity_id,
101 },
102 )
103 elif method == PTZ_METHOD_ONVIF:
104 await hass.services.async_call(
105 "onvif",
106 "ptz",
107 service_data={"move_mode": "GotoPreset", "preset": preset},
108 target={
109 "entity_id": camera_entity_id,
110 },
111 )
112 else:
113 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method)
114 except Exception as e:
115 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e)
118async def snap_image(
119 context: SupernotificationConfiguration,
120 entity_id: str,
121 media_path: Path,
122 notification_id: str,
123 jpeg_args: dict | None = None,
124) -> Path | None:
125 """Use for any image, including MQTT Image"""
126 image_path: Path | None = None
127 try:
128 image_entity: ImageEntity | None = None
129 if context.hass:
130 image_entity = context.hass.data["image"].get_entity(entity_id)
131 if image_entity:
132 bitmap: bytes | None = await image_entity.async_image()
133 if bitmap is None:
134 _LOGGER.warning("SUPERNOTIFY Empty bitmap from image entity %s", entity_id)
135 else:
136 image: Image.Image = Image.open(io.BytesIO(bitmap))
137 media_dir: Path = media_path / "image"
138 media_dir.mkdir(parents=True, exist_ok=True)
140 media_ext: str = image.format.lower() if image.format else "img"
141 timed: str = str(time.time()).replace(".", "_")
142 image_path = Path(media_dir) / f"{notification_id}_{timed}.{media_ext}"
143 buffer = BytesIO()
144 img_args = {}
145 if media_ext in ("jpg", "jpeg") and jpeg_args:
146 img_args.update(jpeg_args)
147 image.save(buffer, image.format, **img_args)
148 async with aiofiles.open(image_path, "wb") as file:
149 await file.write(buffer.getbuffer())
150 else:
151 _LOGGER.warning("SUPERNOTIFY Unable to find image entity %s", entity_id)
152 except Exception as e:
153 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e)
154 return None
155 return image_path
158async def snap_camera(
159 hass: HomeAssistant, camera_entity_id: str, media_path: Path, max_camera_wait: int = 20, jpeg_args: dict | None = None
160) -> Path | None:
161 image_path: Path | None = None
162 if not camera_entity_id:
163 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap")
164 return image_path
165 if jpeg_args:
166 _LOGGER.warning("jpeg_args not yet supported by snap_camera")
168 try:
169 media_dir: Path = media_path / "camera"
170 media_dir.mkdir(parents=True, exist_ok=True)
171 timed = str(time.time()).replace(".", "_")
172 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg"
173 await hass.services.async_call(
174 "camera", "snapshot", service_data={"entity_id": camera_entity_id, "filename": image_path}
175 )
177 # give async service time
178 cutoff_time = time.time() + max_camera_wait
179 while time.time() < cutoff_time and not image_path.exists():
180 _LOGGER.info("Image file not available yet at %s, pausing", image_path)
181 await asyncio.sleep(1)
183 except Exception as e:
184 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e)
185 image_path = None
187 return image_path
190async def select_avail_camera(hass: HomeAssistant, cameras: dict[str, dict], camera_entity_id: str) -> str | None:
191 avail_camera_entity_id: str | None = None
193 try:
194 preferred_cam = cameras.get(camera_entity_id)
196 if not preferred_cam or not preferred_cam.get(CONF_DEVICE_TRACKER):
197 # assume unconfigured camera, or configured without tracker, available
198 avail_camera_entity_id = camera_entity_id
199 elif hass.states.is_state(preferred_cam[CONF_DEVICE_TRACKER], STATE_HOME):
200 avail_camera_entity_id = camera_entity_id
201 else:
202 alt_cams_with_tracker = [
203 cameras[c]
204 for c in preferred_cam.get(CONF_ALT_CAMERA, [])
205 if c in cameras and cameras[c].get(CONF_DEVICE_TRACKER)
206 ]
207 for alt_cam in alt_cams_with_tracker:
208 tracker_entity_id = alt_cam.get(CONF_DEVICE_TRACKER)
209 if tracker_entity_id and hass.states.is_state(tracker_entity_id, STATE_HOME):
210 avail_camera_entity_id = alt_cam[CONF_CAMERA]
211 _LOGGER.info(
212 "SUPERNOTIFY Selecting available camera %s rather than %s", avail_camera_entity_id, camera_entity_id
213 )
214 break
215 if avail_camera_entity_id is None:
216 alt_cam_ids_without_tracker = [
217 c
218 for c in preferred_cam.get(CONF_ALT_CAMERA, [])
219 if c not in cameras or not cameras[c].get(CONF_DEVICE_TRACKER)
220 ]
221 if len(alt_cam_ids_without_tracker) > 0:
222 _LOGGER.info(
223 "SUPERNOTIFY Selecting untracked camera %s rather than %s", avail_camera_entity_id, camera_entity_id
224 )
225 avail_camera_entity_id = alt_cam_ids_without_tracker[0]
227 if avail_camera_entity_id is None:
228 _LOGGER.warning("%s not available and no alternative available", camera_entity_id)
229 for c in cameras.values():
230 if c.get(CONF_DEVICE_TRACKER):
231 _LOGGER.debug(
232 "SUPERNOTIFY Tracker %s: %s", c.get(CONF_DEVICE_TRACKER), hass.states.get(c[CONF_DEVICE_TRACKER])
233 )
235 except Exception as e:
236 _LOGGER.warning("SUPERNOTIFY Unable to select available camera: %s", e)
238 return avail_camera_entity_id