mycal.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | from uuid import uuid4 from icalendar.cal import Calendar, FreeBusy import icalendar import urllib.parse import requests from flask import Flask from datetime import date, datetime, timedelta, time import zoneinfo from dataclasses import dataclass, field from danoan.toml_dataclass import TomlDataClassIO from os import environ import typing import traceback @dataclass class CalendarConfig(TomlDataClassIO): file: typing.Optional[str] = None url: typing.Optional[str] = None all_tentative: bool = False @dataclass class Config(TomlDataClassIO): name: str email: str timezone: str calendar: typing.Optional[CalendarConfig] = None # Keep for backward compatibility calendars: typing.List[CalendarConfig] = field(default_factory=list) # New field for multiple calendars def load_config(file_path): with open(file_path, "r") as fr: return Config.read(fr) app = Flask(__name__) config = load_config(environ.get("CONFIG_FILE", "config.toml")) tz = zoneinfo.ZoneInfo(config.timezone) utc = zoneinfo.ZoneInfo("UTC") def fetch_calendar(calendar_url): return requests.get(calendar_url).content def read_calendar_file(calendar_file): with open(calendar_file, 'rb') as f: return f.read() def get_calendar(calendar_config): if 'file' in calendar_config and calendar_config['file']: return read_calendar_file(calendar_config['file']) else: if 'url' in calendar_config and calendar_config['url']: u = urllib.parse.urlsplit(calendar_config['url']) if u.scheme == "webcal": u = u._replace(scheme="https") return fetch_calendar(urllib.parse.urlunsplit(u)) raise ValueError("Calendar URL not configured.") def fixup_date(dt): if type(dt) == date: return datetime.combine(dt, time.min, tzinfo=tz) elif dt.tzinfo is None: return dt.replace(tzinfo=tz) else: return dt.astimezone(tz) def looks_tentative(component): summary = str(component.get('summary', '')) return component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?") def process_calendar_events(calendar, start_date, end_date, is_tentative_fn=looks_tentative): """ Process events from a calendar and return a list of event periods. This function examines calendar components, filters them by date range, determines if they are tentative based on the provided function, and returns them as a list of vPeriod objects with FBTYPE property set. Args: calendar: The input calendar object to process start_date: Start date for filtering events end_date: End date for filtering events is_tentative_fn: Optional function to determine if an event is tentative, should accept a calendar component and return a boolean Returns: List of vPeriod objects with FBTYPE set to either BUSY_TENTATIVE or BUSY_UNAVAILABLE """ events = [] for component in calendar.walk(): if component.name == "VEVENT": dtstart = fixup_date(component.get('dtstart').dt) dtend = fixup_date(component.get('dtend', component.get('dtstart')).dt) if dtstart >= start_date and dtend <= end_date: vp = icalendar.prop.vPeriod([t.astimezone(utc) for t in [dtstart, dtend]]) vp.params.pop('TZID') # remove timezone information if is_tentative_fn(component): vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_TENTATIVE else: vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_UNAVAILABLE events.append(vp) return events @app.route(f'/{str.lower(config.name)}.ics') def index(): today = date.today() start_of_week = today - timedelta(days=today.weekday()) start_date = datetime.combine(start_of_week, time.min, tzinfo=tz) end_date = start_date + timedelta(days=30) organizer = icalendar.prop.vCalAddress(f"mailto:{config.email}") organizer.name = config.name domain = config.email.split('@')[1] output = Calendar() output.add('prodid', '-//Calendar Anonymiser//alin.ovh//') output.add('version', '2.0') busy = FreeBusy(uid=f'{uuid4()}@{domain}') busy.add('organizer', organizer) busy.add('dtstamp', datetime.now(tz=utc)) busy.add('dtstart', start_date.astimezone(tz=utc)) busy.add('dtend', end_date.astimezone(tz=utc)) try: calendars_to_process = [] busy_periods = {} # For backward compatibility if config.calendar is not None and (config.calendar['file'] != "" or config.calendar['url'] != ""): calendars_to_process.append(config.calendar) # Add calendars from the new field if available if config.calendars: calendars_to_process.extend(config.calendars) for calendar_config in calendars_to_process: calendar_data = get_calendar(calendar_config) input_calendar = Calendar.from_ical(calendar_data) if 'all_tentative' in calendar_config and calendar_config['all_tentative']: tentative_fn = lambda component: True else: tentative_fn = looks_tentative events = process_calendar_events(input_calendar, start_date, end_date, is_tentative_fn=tentative_fn) # Process events considering tentative status for event in events: period_key = (event.dt[0], event.dt[1]) if period_key in busy_periods: existing_event = busy_periods[period_key] # If the existing event is tentative and the new one is not, # replace the existing event with the non-tentative one if (existing_event.FBTYPE == icalendar.enums.FBTYPE.BUSY_TENTATIVE and event.FBTYPE == icalendar.enums.FBTYPE.BUSY_UNAVAILABLE): busy_periods[period_key] = event else: busy_periods[period_key] = event for event in busy_periods.values(): busy.add('freebusy', event) output.add_component(busy) return output.to_ical(), {"Content-Type": "text/calendar"} except Exception as e: print(f"error: {str(e)} {''.join(traceback.format_exception(e))}") return f"Error processing calendar: {str(e)}", 500 if __name__ == '__main__': app.run(debug=True) |