mycal.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | from uuid import uuid4 from icalendar.cal import Calendar, FreeBusy import icalendar import urllib.parse import aiohttp import asyncio from quart import Quart from datetime import date, datetime, timedelta, time import zoneinfo from dataclasses import dataclass, field from danoan.toml_dataclass import TomlDataClassIO from os import environ import typing import traceback import logging from asyncio import Condition @dataclass class CalendarConfig(TomlDataClassIO): file: typing.Optional[str] = None url: typing.Optional[str] = None all_tentative: bool = False @dataclass class Config(TomlDataClassIO): name: str email: str timezone: str update_interval: int = 3600 calendars: typing.List[CalendarConfig] = field(default_factory=list) def load_config(file_path): with open(file_path, "r") as fr: return Config.read(fr) app = Quart(__name__) config = load_config(environ.get("CONFIG_FILE", "config.toml")) tz = zoneinfo.ZoneInfo(config.timezone) utc = zoneinfo.ZoneInfo("UTC") calendar_cache = { 'last_updated': None, 'busy_ical': "", 'events_ical': "" } next_update_seconds = 0 cache_condition = Condition() async def fetch_calendar_async(calendar_url): async with aiohttp.ClientSession() as session: async with session.get(calendar_url) as response: return await response.read() def read_calendar_file(calendar_file): with open(calendar_file, 'rb') as f: return f.read() async def get_calendar_async(calendar_config): if 'file' in calendar_config: return read_calendar_file(calendar_config['file']) elif 'url' in calendar_config: u = urllib.parse.urlsplit(calendar_config['url']) if u.scheme == "webcal": u = u._replace(scheme="https") return await fetch_calendar_async(urllib.parse.urlunsplit(u)) else: raise ValueError("Calendar URL or file not configured.") def fixup_date(dt): if type(dt) == date: return datetime.combine(dt, time.min, tzinfo=tz) elif dt.tzinfo is None: return dt.replace(tzinfo=tz) else: return dt.astimezone(tz) def looks_tentative(component): summary = str(component.get('summary', '')) return component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?") def make_clean_event(domain, component): new_event = icalendar.cal.Event() new_event.add('SUMMARY', f"{config.name} Busy") new_event.add('DTSTART', component['DTSTART']) new_event.add('DTEND', component['DTEND']) # Ensure each event has a unique UID new_event.add('UID', f'{uuid4()}@{domain}') new_event.add('DTSTAMP', datetime.now(tz=utc)) return new_event def make_busy_period(component, dtstart, dtend, is_tentative_fn): vp = icalendar.prop.vPeriod([t.astimezone(utc) for t in [dtstart, dtend]]) vp.params.pop('TZID') # remove timezone information if is_tentative_fn(component): vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_TENTATIVE else: vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_UNAVAILABLE return vp def process_calendar_events(calendar, start_date, end_date): """ Process events from a calendar and return both event components and busy periods. This function examines calendar components, filters them by date range, determines if they are tentative based on the provided function, Args: calendar: The input calendar object to process start_date: Start date for filtering events end_date: End date for filtering events Returns: Dictionary of vPeriod objects with FBTYPE set to BUSY_TENTATIVE or BUSY_UNAVAILABLE """ events = {} for component in calendar.walk(): if component.name == "VEVENT": dtstart = fixup_date(component.get('dtstart').dt) dtend = fixup_date(component.get('dtend', component.get('dtstart')).dt) if dtstart >= start_date and dtend <= end_date: period_key = (dtstart, dtend) events[period_key] = component return events def make_calendar(): c = Calendar() c.add('prodid', '-//Calendar Anonymiser//alin.ovh//') c.add('version', '2.0') return c async def update_calendar_cache(): """ Update the global calendar cache with fresh data """ try: # Prepare data outside the lock domain = config.email.split('@')[1] today = date.today() start_of_week = today - timedelta(days=today.weekday()) start_date = datetime.combine(start_of_week, time.min, tzinfo=tz) end_date = start_date + timedelta(days=30) busy_periods = {} events = {} for calendar_config in config.calendars: try: if 'all_tentative' in calendar_config and calendar_config['all_tentative']: tentative_fn = lambda component: True else: tentative_fn = looks_tentative calendar_data = await get_calendar_async(calendar_config) input_calendar = Calendar.from_ical(calendar_data) calendar_events = process_calendar_events(input_calendar, start_date, end_date) for period_key, event_component in calendar_events.items(): if not (period_key in events and tentative_fn(events[period_key])): dtstart, dtend = period_key events[period_key] = make_clean_event(domain, event_component) busy_periods[period_key] = make_busy_period(event_component, dtstart, dtend, tentative_fn) except Exception as e: logging.error(f"Error processing calendar {calendar_config}: {str(e)}") logging.error(traceback.format_exc()) # Generate the busy/free calendar organizer = icalendar.prop.vCalAddress(f"mailto:{config.email}") organizer.name = config.name busy_calendar = make_calendar() busy = FreeBusy(uid=f'{uuid4()}@{domain}') busy.add('organizer', organizer) busy.add('dtstamp', datetime.now(tz=utc)) busy.add('dtstart', start_date.astimezone(tz=utc)) busy.add('dtend', end_date.astimezone(tz=utc)) for period in busy_periods.values(): busy.add('freebusy', period) busy_calendar.add_component(busy) events_calendar = make_calendar() for event_component in events.values(): events_calendar.add_component(event_component) # Update the global cache async with cache_condition: calendar_cache['last_updated'] = datetime.now(tz=utc) calendar_cache['busy_ical'] = busy_calendar.to_ical() calendar_cache['events_ical'] = events_calendar.to_ical() # Notify all waiting tasks that the update is complete cache_condition.notify_all() logging.info(f"Calendar cache updated with {len(events)} events") except Exception as e: logging.error(f"Error updating calendar cache: {str(e)}") logging.error(traceback.format_exc()) # Notify waiters even on error async with cache_condition: cache_condition.notify_all() def seconds_until_next_day(): """ Calculate the number of seconds until midnight """ now = datetime.now(tz) tomorrow = datetime.combine(now.date() + timedelta(days=1), time.min, tzinfo=tz) return int((tomorrow - now).total_seconds()) async def periodic_calendar_update(): """ Background task to periodically update the calendar cache """ global next_update_seconds while True: await asyncio.sleep(1) next_update_seconds -= 1 if next_update_seconds <= 0: await update_calendar_cache() schedule_next_update() def schedule_next_update(): global next_update_seconds next_update_seconds = min(config.update_interval, seconds_until_next_day()) @app.before_serving async def startup(): """ Startup tasks before the server begins serving requests """ logging.basicConfig(level=logging.INFO) await update_calendar_cache() schedule_next_update() app.add_background_task(periodic_calendar_update) @app.route(f'/{str.lower(config.name)}.ics', defaults = {'key': 'busy_ical'}) @app.route(f'/busy/{str.lower(config.name)}.ics', defaults = {'key': 'busy_ical'}) @app.route(f'/events/{str.lower(config.name)}.ics', defaults = {'key': 'events_ical'}) async def events(key): try: async with cache_condition: if calendar_cache[key]: return calendar_cache[key], { "Content-Type": "text/calendar", "Cache-Control": f"max-age={next_update_seconds}", "Last-Modified": calendar_cache['last_updated'].strftime("%a, %d %b %Y %H:%M:%S GMT") } else: return "Calendar data not yet available. Please try again shortly.", 503 except Exception as e: logging.error(f"Error serving calendar: {str(e)}") logging.error(traceback.format_exc()) return f"Error processing calendar: {str(e)}", 500 if __name__ == '__main__': app.run(debug=True) |