Coverage for custom_components/supernotify/archive.py: 74%
109 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-18 09:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-18 09:29 +0000
1import datetime as dt
2import logging
3from abc import abstractmethod
4from pathlib import Path
5from typing import Any
7import aiofiles.os
8import anyio
9import homeassistant.util.dt as dt_util
10from homeassistant.components import mqtt
11from homeassistant.core import HomeAssistant
12from homeassistant.helpers.json import save_json
14_LOGGER = logging.getLogger(__name__)
16ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60
17ARCHIVE_DEFAULT_DAYS = 1
18WRITE_TEST = ".startup"
21class ArchivableObject:
22 @abstractmethod
23 def base_filename(self) -> str:
24 pass
26 def contents(self, minimal: bool = False) -> Any:
27 pass
30class ArchiveTopic:
31 def __init__(self, hass: HomeAssistant, topic: str, qos: int = 0, retain: bool = True) -> None:
32 self._hass = hass
33 self.topic = topic
34 self.qos = qos
35 self.retain = retain
37 async def publish(self, archive_object: ArchivableObject) -> None:
38 payload = archive_object.contents(minimal=True)
39 _LOGGER.debug("SUPERNOTIFY Publishing notification to %s", self.topic)
40 await mqtt.async_publish(self._hass, self.topic, payload, qos=self.qos, retain=self.retain)
43class NotificationArchive:
44 def __init__(
45 self, enabled: bool, archive_path: str | None, archive_days: str | None, purge_minute_interval: str | None = None
46 ) -> None:
47 self.enabled = enabled
48 self.last_purge: dt.datetime | None = None
49 self.configured_archive_path: str | None = archive_path
50 self.archive_path: Path | None = None
51 self.archive_days: int = int(archive_days) if archive_days else ARCHIVE_DEFAULT_DAYS
52 self.purge_minute_interval = int(purge_minute_interval) if purge_minute_interval else ARCHIVE_PURGE_MIN_INTERVAL
54 def initialize(self) -> None:
55 if not self.enabled:
56 _LOGGER.info("SUPERNOTIFY Archive disabled")
57 return
58 if not self.configured_archive_path:
59 _LOGGER.warning("SUPERNOTIFY archive path not configured")
60 return
61 verify_archive_path: Path = Path(self.configured_archive_path)
62 if verify_archive_path and not verify_archive_path.exists():
63 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path)
64 try:
65 verify_archive_path.mkdir(parents=True, exist_ok=True)
66 except Exception as e:
67 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e)
68 if verify_archive_path and verify_archive_path.exists() and verify_archive_path.is_dir():
69 try:
70 verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True)
71 self.archive_path = verify_archive_path
72 except Exception as e:
73 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e)
74 self.enabled = False
75 else:
76 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path)
77 self.enabled = False
79 async def size(self) -> int:
80 path = self.archive_path
81 if path and await anyio.Path(path).exists():
82 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST)
83 return 0
85 async def cleanup(self, days: int | None = None, force: bool = False) -> int:
86 if (
87 not force
88 and self.last_purge is not None
89 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval)
90 ):
91 return 0
92 days = days or self.archive_days
94 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=self.archive_days)
95 cutoff = cutoff.astimezone(dt.UTC)
96 purged = 0
97 if self.archive_path and await anyio.Path(self.archive_path).exists():
98 try:
99 archive = await aiofiles.os.scandir(self.archive_path)
100 for entry in archive:
101 if entry.name == ".startup":
102 continue
103 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff:
104 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path)
105 await aiofiles.os.unlink(Path(entry.path))
106 purged += 1
107 except Exception as e:
108 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True)
109 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff)
110 self.last_purge = dt.datetime.now(dt.UTC)
111 else:
112 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path)
113 return purged
115 def archive(self, archive_object: ArchivableObject) -> bool:
116 if not self.enabled or not self.archive_path:
117 return False
118 archive_path: str = ""
119 try:
120 filename = f"{archive_object.base_filename()}.json"
121 archive_path = str(self.archive_path.joinpath(filename))
122 save_json(archive_path, archive_object.contents())
123 _LOGGER.debug("SUPERNOTIFY Archived notification %s", archive_path)
124 return True
125 except Exception as e:
126 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e)
127 try:
128 save_json(archive_path, archive_object.contents(minimal=True))
129 _LOGGER.debug("SUPERNOTIFY Archived minimal notification %s", archive_path)
130 return True
131 except Exception as e2:
132 _LOGGER.warning("SUPERNOTIFY Unable to archive minimal notification: %s", e2)
133 return False