all repos — mycal @ 10527ff359e804c9706b0c16f6176ad1f1a9e858

private calendar anonymiser

mycal.py (view raw)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from uuid import uuid4
from icalendar.cal import Calendar, FreeBusy
import icalendar

import urllib.parse
import requests
from flask import Flask
from datetime import date, datetime, timedelta, time
import zoneinfo
from dataclasses import dataclass, field
from danoan.toml_dataclass import TomlDataClassIO
from os import environ
import typing
import traceback

@dataclass
class CalendarConfig(TomlDataClassIO):
    file: typing.Optional[str] = None
    url: typing.Optional[str] = None
    all_tentative: bool = False

@dataclass
class Config(TomlDataClassIO):
    name: str
    email: str
    timezone: str
    calendar: typing.Optional[CalendarConfig] = None  # Keep for backward compatibility
    calendars: typing.List[CalendarConfig] = field(default_factory=list)  # New field for multiple calendars

def load_config(file_path):
    with open(file_path, "r") as fr:
        return Config.read(fr)

app = Flask(__name__)

config = load_config(environ.get("CONFIG_FILE", "config.toml"))
tz = zoneinfo.ZoneInfo(config.timezone)
utc = zoneinfo.ZoneInfo("UTC")

def fetch_calendar(calendar_url):
    return requests.get(calendar_url).content

def read_calendar_file(calendar_file):
    with open(calendar_file, 'rb') as f:
        return f.read()

def get_calendar(calendar_config):
    if 'file' in calendar_config and calendar_config['file']:
        return read_calendar_file(calendar_config['file'])
    else:
        if 'url' in calendar_config and calendar_config['url']:
            u = urllib.parse.urlsplit(calendar_config['url'])
            if u.scheme == "webcal":
                u = u._replace(scheme="https")
            return fetch_calendar(urllib.parse.urlunsplit(u))
        raise ValueError("Calendar URL not configured.")

def fixup_date(dt):
    if type(dt) == date:
        return datetime.combine(dt, time.min, tzinfo=tz)
    elif dt.tzinfo is None:
        return dt.replace(tzinfo=tz)
    else:
        return dt.astimezone(tz)

def looks_tentative(component):
    summary = str(component.get('summary', ''))
    return component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?")

def process_calendar_events(calendar, start_date, end_date, is_tentative_fn=looks_tentative):
    """
    Process events from a calendar and return a list of event periods.

    This function examines calendar components, filters them by date range,
    determines if they are tentative based on the provided function,
    and returns them as a list of vPeriod objects with FBTYPE property set.

    Args:
        calendar: The input calendar object to process
        start_date: Start date for filtering events
        end_date: End date for filtering events
        is_tentative_fn: Optional function to determine if an event is tentative,
                        should accept a calendar component and return a boolean

    Returns:
        List of vPeriod objects with FBTYPE set to either
        BUSY_TENTATIVE or BUSY_UNAVAILABLE
    """
    events = []
    for component in calendar.walk():
        if component.name == "VEVENT":
            dtstart = fixup_date(component.get('dtstart').dt)
            dtend = fixup_date(component.get('dtend', component.get('dtstart')).dt)
            if dtstart >= start_date and dtend <= end_date:
                vp = icalendar.prop.vPeriod([t.astimezone(utc) for t in [dtstart, dtend]])
                vp.params.pop('TZID')  # remove timezone information

                if is_tentative_fn(component):
                    vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_TENTATIVE
                else:
                    vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_UNAVAILABLE

                events.append(vp)

    return events

@app.route(f'/{str.lower(config.name)}.ics')
def index():
    today = date.today()
    start_of_week = today - timedelta(days=today.weekday())
    start_date = datetime.combine(start_of_week, time.min, tzinfo=tz)
    end_date = start_date + timedelta(days=30)
    organizer = icalendar.prop.vCalAddress(f"mailto:{config.email}")
    organizer.name = config.name
    domain = config.email.split('@')[1]

    output = Calendar()
    output.add('prodid', '-//Calendar Anonymiser//alin.ovh//')
    output.add('version', '2.0')

    busy = FreeBusy(uid=f'{uuid4()}@{domain}')
    busy.add('organizer', organizer)
    busy.add('dtstamp', datetime.now(tz=utc))
    busy.add('dtstart', start_date.astimezone(tz=utc))
    busy.add('dtend', end_date.astimezone(tz=utc))

    try:
        calendars_to_process = []
        busy_periods = {}

        # For backward compatibility
        if config.calendar is not None and (config.calendar['file'] != "" or config.calendar['url'] != ""):
            calendars_to_process.append(config.calendar)

        # Add calendars from the new field if available
        if config.calendars:
            calendars_to_process.extend(config.calendars)

        for calendar_config in calendars_to_process:
            calendar_data = get_calendar(calendar_config)
            input_calendar = Calendar.from_ical(calendar_data)

            if 'all_tentative' in calendar_config and calendar_config['all_tentative']:
                tentative_fn = lambda component: True
            else:
                tentative_fn = looks_tentative

            events = process_calendar_events(input_calendar, start_date, end_date, is_tentative_fn=tentative_fn)

            # Process events considering tentative status
            for event in events:
                period_key = (event.dt[0], event.dt[1])

                if period_key in busy_periods:
                    existing_event = busy_periods[period_key]

                    # If the existing event is tentative and the new one is not,
                    # replace the existing event with the non-tentative one
                    if (existing_event.FBTYPE == icalendar.enums.FBTYPE.BUSY_TENTATIVE and
                        event.FBTYPE == icalendar.enums.FBTYPE.BUSY_UNAVAILABLE):
                        busy_periods[period_key] = event
                else:
                    busy_periods[period_key] = event

        for event in busy_periods.values():
            busy.add('freebusy', event)

        output.add_component(busy)
        return output.to_ical(), {"Content-Type": "text/calendar"}
    except Exception as e:
        print(f"error: {str(e)} {''.join(traceback.format_exception(e))}")
        return f"Error processing calendar: {str(e)}", 500

if __name__ == '__main__':
    app.run(debug=True)