mycal.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | from uuid import uuid4 from icalendar.cal import Calendar, FreeBusy import icalendar import urllib.parse import requests from flask import Flask from datetime import date, datetime, timedelta, time import zoneinfo from dataclasses import dataclass, field from danoan.toml_dataclass import TomlDataClassIO from os import environ import typing import traceback @dataclass class CalendarConfig(TomlDataClassIO): file: typing.Optional[str] = None url: typing.Optional[str] = None all_tentative: bool = False @dataclass class Config(TomlDataClassIO): name: str email: str timezone: str calendar: typing.Optional[CalendarConfig] = None # Keep for backward compatibility calendars: typing.List[CalendarConfig] = field(default_factory=list) # New field for multiple calendars def load_config(file_path): with open(file_path, "r") as fr: return Config.read(fr) app = Flask(__name__) config = load_config(environ.get("CONFIG_FILE", "config.toml")) tz = zoneinfo.ZoneInfo(config.timezone) utc = zoneinfo.ZoneInfo("UTC") def fetch_calendar(calendar_url): return requests.get(calendar_url).content def read_calendar_file(calendar_file): with open(calendar_file, 'rb') as f: return f.read() def get_calendar(calendar_config): if 'file' in calendar_config and calendar_config['file']: return read_calendar_file(calendar_config['file']) else: if 'url' in calendar_config and calendar_config['url']: u = urllib.parse.urlsplit(calendar_config['url']) if u.scheme == "webcal": u = u._replace(scheme="https") return fetch_calendar(urllib.parse.urlunsplit(u)) raise ValueError("Calendar URL not configured.") def fixup_date(dt): if type(dt) == date: return datetime.combine(dt, time.min, tzinfo=tz) elif dt.tzinfo is None: return dt.replace(tzinfo=tz) else: return dt.astimezone(tz) def looks_tentative(component): summary = str(component.get('summary', '')) return component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?") def custom_tentative_detector(component): """ A custom function to determine if an event is tentative. This can be passed to process_calendar_events to override the default logic. """ summary = str(component.get('summary', '')) # Add your custom logic here return (component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?")) def process_calendar_events(calendar, busy, start_date, end_date, is_tentative_fn=None): """ Process events from a calendar and add them to the FreeBusy component. Args: calendar: The input calendar to process busy: The FreeBusy component to add events to start_date: Start date for filtering events end_date: End date for filtering events is_tentative_fn: Optional function to determine if an event is tentative """ if is_tentative_fn is None: is_tentative_fn = looks_tentative for component in calendar.walk(): if component.name == "VEVENT": dtstart = fixup_date(component.get('dtstart').dt) dtend = fixup_date(component.get('dtend', component.get('dtstart')).dt) if dtstart >= start_date and dtend <= end_date: vp = icalendar.prop.vPeriod([t.astimezone(utc) for t in [dtstart, dtend]]) vp.params.pop('TZID') # remove timezone information if is_tentative_fn(component): vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_TENTATIVE else: vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_UNAVAILABLE busy.add('freebusy', vp) @app.route(f'/{str.lower(config.name)}.ics') def index(): today = date.today() start_of_week = today - timedelta(days=today.weekday()) start_date = datetime.combine(start_of_week, time.min, tzinfo=tz) end_date = start_date + timedelta(days=30) organizer = icalendar.prop.vCalAddress(f"mailto:{config.email}") organizer.name = config.name domain = config.email.split('@')[1] output = Calendar() output.add('prodid', '-//Calendar Anonymiser//alin.ovh//') output.add('version', '2.0') busy = FreeBusy(uid=f'{uuid4()}@{domain}') busy.add('organizer', organizer) busy.add('dtstamp', datetime.now(tz=utc)) busy.add('dtstart', start_date.astimezone(tz=utc)) busy.add('dtend', end_date.astimezone(tz=utc)) try: # Process calendars from the config calendars_to_process = [] # For backward compatibility if config.calendar is not None and (config.calendar['file'] != "" or config.calendar['url'] != ""): calendars_to_process.append(config.calendar) # Add calendars from the new field if available if config.calendars: calendars_to_process.extend(config.calendars) for calendar_config in calendars_to_process: calendar_data = get_calendar(calendar_config) input_calendar = Calendar.from_ical(calendar_data) if 'all_tentative' in calendar_config and calendar_config['all_tentative']: tentative_fn = lambda component: True else: tentative_fn = looks_tentative process_calendar_events(input_calendar, busy, start_date, end_date, is_tentative_fn=tentative_fn) output.add_component(busy) return output.to_ical(), {"Content-Type": "text/calendar"} except Exception as e: print(f"error: {str(e)} {''.join(traceback.format_exception(e))}") return f"Error processing calendar: {str(e)}", 500 if __name__ == '__main__': app.run(debug=True) |