all repos — mycal @ b0db5da0bbc4fade76ad3c19b99dd64cbde510b1

private calendar anonymiser

mycal.py (view raw)

import asyncio
import logging
import traceback
import urllib.parse
import zoneinfo
from asyncio import Condition
from dataclasses import dataclass, field
from datetime import date, datetime, time, timedelta
from os import environ
from typing import Callable
from uuid import uuid4

import aiohttp
import icalendar
from danoan.toml_dataclass import TomlDataClassIO
from icalendar.cal import Calendar, FreeBusy
from quart import Quart

type PeriodKey = tuple[datetime, datetime]

@dataclass
class CalendarConfig(TomlDataClassIO):
    file: str | None = None
    url: str | None = None
    all_tentative: bool = False

@dataclass
class Config(TomlDataClassIO):
    name: str
    email: str
    timezone: str
    update_interval: int = 3600
    calendars: list[CalendarConfig] = field(default_factory=list)

def load_config(file_path: str):
    with open(file_path, "r") as fr:
        c = Config.read(fr)
        if c is None:
            raise ValueError("Failed to load configuration")
        return c

app = Quart(__name__)

config = load_config(environ.get("CONFIG_FILE", "config.toml"))

tz = zoneinfo.ZoneInfo(config.timezone)
utc = zoneinfo.ZoneInfo("UTC")

@dataclass
class CalendarCache:
    condition: Condition = field(default_factory=Condition)
    last_updated: datetime = field(default=datetime.fromtimestamp(0))
    store: dict[str, bytes] = field(default_factory=dict)

    def __setitem__(self, key: str, value: bytes):
        self.store[key] = value

    def __getitem__(self, key: str):
        return self.store[key]

calendar_cache = CalendarCache()
next_update_seconds = 0

async def fetch_calendar_async(calendar_url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(calendar_url) as response:
            return await response.read()

def read_calendar_file(calendar_file: str):
    with open(calendar_file, 'rb') as f:
        return f.read()

async def get_calendar_async(calendar_config: CalendarConfig):
    if calendar_config.file is not None:
        return read_calendar_file(calendar_config.file)
    elif calendar_config.url is not None:
        u = urllib.parse.urlsplit(calendar_config.url)
        if u.scheme == "webcal":
            u = u._replace(scheme="https")
        return await fetch_calendar_async(urllib.parse.urlunsplit(u))
    else:
        raise ValueError("Calendar URL or file not configured.")

def fixup_date(dt: datetime) -> datetime:
    if dt.tzinfo is None:
        return dt.replace(tzinfo=tz)
    else:
        return dt.astimezone(tz)

def looks_tentative(component: icalendar.Component) -> bool:
    summary = str(component.get('summary', ''))
    return component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?")

def make_clean_event(domain: str, component: icalendar.Component) -> icalendar.Event:
    new_event = icalendar.cal.Event()

    new_event.add('SUMMARY', f"{config.name} Busy")
    new_event.add('DTSTART', component['DTSTART'])
    new_event.add('DTEND', component['DTEND'])

    # Ensure each event has a unique UID
    new_event.add('UID', f'{uuid4()}@{domain}')
    new_event.add('DTSTAMP', datetime.now(tz=utc))

    return new_event

def make_busy_period(
    component: icalendar.Component,
    dtstart: datetime,
    dtend: datetime,
    is_tentative_fn: Callable[[icalendar.Component], bool]
) -> icalendar.prop.vPeriod:
    vp = icalendar.prop.vPeriod((dtstart.astimezone(utc), dtend.astimezone(utc)))
    del vp.params['TZID']  # remove timezone information

    if is_tentative_fn(component):
        vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_TENTATIVE
    else:
        vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_UNAVAILABLE

    return vp

def process_calendar_events(calendar: icalendar.Component, start_date: date, end_date: date) -> dict[PeriodKey, icalendar.Component]:
    """
    Process events from a calendar and return both event components and busy periods.

    This function examines calendar components, filters them by date range,
    determines if they are tentative based on the provided function,

    Args:
        calendar: The input calendar object to process
        start_date: Start date for filtering events
        end_date: End date for filtering events

    Returns:
        Dictionary of vPeriod objects with FBTYPE set to BUSY_TENTATIVE or BUSY_UNAVAILABLE
    """
    events: dict[PeriodKey, icalendar.Component] = {}

    for component in calendar.walk():
        if component.name == "VEVENT":
            dtstart = fixup_date(component.get('dtstart').dt)
            dtend = fixup_date(component.get('dtend', component.get('dtstart')).dt)
            if dtstart >= start_date and dtend <= end_date:
                period_key: PeriodKey = (dtstart, dtend)
                events[period_key] = component

    return events

def make_calendar():
    c = Calendar()
    c.add('prodid', '-//Calendar Anonymiser//alin.ovh//')
    c.add('version', '2.0')

    return c

def always_tentative(_: icalendar.Component) -> bool:
    return True

async def update_calendar_cache():
    """
    Update the global calendar cache with fresh data
    """
    try:
        # Prepare data outside the lock

        domain = config.email.split('@')[1]

        today = date.today()
        start_of_week = today - timedelta(days=today.weekday())
        start_date = datetime.combine(start_of_week, time.min, tzinfo=tz)
        end_date = start_date + timedelta(days=30)

        busy_periods: dict[PeriodKey, icalendar.vPeriod] = {}
        events: dict[PeriodKey, icalendar.Event] = {}

        for calendar_config in config.calendars:
            try:
                if calendar_config.all_tentative:
                    tentative_fn = always_tentative
                else:
                    tentative_fn = looks_tentative

                calendar_data = await get_calendar_async(calendar_config)
                input_calendar = Calendar.from_ical(str(calendar_data))

                calendar_events = process_calendar_events(input_calendar, start_date, end_date)

                for period_key, event_component in calendar_events.items():
                    if not (period_key in events and tentative_fn(events[period_key])):
                        dtstart, dtend = period_key
                        events[period_key] = make_clean_event(domain, event_component)
                        busy_periods[period_key] = make_busy_period(event_component, dtstart, dtend, tentative_fn)

            except Exception as e:
                logging.error(f"Error processing calendar {calendar_config}: {str(e)}")
                logging.error(traceback.format_exc())

        # Generate the busy/free calendar
        organizer = icalendar.prop.vCalAddress(f"mailto:{config.email}")
        organizer.name = config.name

        busy_calendar = make_calendar()

        busy = FreeBusy(uid=f'{uuid4()}@{domain}')
        busy.add('organizer', organizer)
        busy.add('dtstamp', datetime.now(tz=utc))
        busy.add('dtstart', start_date.astimezone(tz=utc))
        busy.add('dtend', end_date.astimezone(tz=utc))

        for period in busy_periods.values():
            busy.add('freebusy', period)

        busy_calendar.add_component(busy)

        events_calendar = make_calendar()

        for event_component in events.values():
            events_calendar.add_component(event_component)

        # Update the global cache
        async with calendar_cache.condition:
            calendar_cache.last_updated = datetime.now(tz=utc)
            calendar_cache['busy_ical'] = busy_calendar.to_ical()
            calendar_cache['events_ical'] = events_calendar.to_ical()
            # Notify all waiting tasks that the update is complete
            calendar_cache.condition.notify_all()

        logging.info(f"Calendar cache updated with {len(events)} events")
    except Exception as e:
        logging.error(f"Error updating calendar cache: {str(e)}")
        logging.error(traceback.format_exc())
        # Notify waiters even on error
        async with calendar_cache.condition:
            calendar_cache.condition.notify_all()

def seconds_until_next_day():
    """
    Calculate the number of seconds until midnight
    """
    now = datetime.now(tz)
    tomorrow = datetime.combine(now.date() + timedelta(days=1), time.min, tzinfo=tz)
    return int((tomorrow - now).total_seconds())

TICK = 60
async def periodic_calendar_update():
    """
    Background task to periodically update the calendar cache
    """
    global next_update_seconds
    while True:
        await asyncio.sleep(TICK)
        next_update_seconds -= TICK
        if next_update_seconds <= 0:
            await update_calendar_cache()
            schedule_next_update()

def schedule_next_update():
    global next_update_seconds
    next_update_seconds = min(config.update_interval, seconds_until_next_day())

@app.before_serving
async def startup():
    """
    Startup tasks before the server begins serving requests
    """
    logging.basicConfig(level=logging.INFO)
    await update_calendar_cache()
    schedule_next_update()
    app.add_background_task(periodic_calendar_update)

@app.route(f'/{str.lower(config.name)}.ics', defaults = {'key': 'busy_ical'})
@app.route(f'/busy/{str.lower(config.name)}.ics', defaults = {'key': 'busy_ical'})
@app.route(f'/events/{str.lower(config.name)}.ics', defaults = {'key': 'events_ical'})
async def events(key: str):
    try:
        async with calendar_cache.condition:
            if calendar_cache[key]:
                return calendar_cache[key], {
                    "Content-Type": "text/calendar",
                    "Cache-Control": f"max-age={next_update_seconds}",
                    "Last-Modified": calendar_cache.last_updated.strftime("%a, %d %b %Y %H:%M:%S GMT")
                }
            else:
                return "Calendar data not yet available. Please try again shortly.", 503
    except Exception as e:
        logging.error(f"Error serving calendar: {str(e)}")
        logging.error(traceback.format_exc())
        return f"Error processing calendar: {str(e)}", 500

if __name__ == '__main__':
    app.run(debug=True)