all repos — mycal @ 3b2417dc2d64bc4b5c2f1fadf39b43bd832b0da6

private calendar anonymiser

mycal.py (view raw)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
from uuid import uuid4
from icalendar.cal import Calendar, FreeBusy
import icalendar

import urllib.parse
import aiohttp
import asyncio
from quart import Quart
from datetime import date, datetime, timedelta, time
import zoneinfo
from dataclasses import dataclass, field
from danoan.toml_dataclass import TomlDataClassIO
from os import environ
import typing
import traceback
import logging
from asyncio import Condition

@dataclass
class CalendarConfig(TomlDataClassIO):
    file: typing.Optional[str] = None
    url: typing.Optional[str] = None
    all_tentative: bool = False

@dataclass
class Config(TomlDataClassIO):
    name: str
    email: str
    timezone: str
    update_interval: int = 3600
    calendars: typing.List[CalendarConfig] = field(default_factory=list)

def load_config(file_path):
    with open(file_path, "r") as fr:
        return Config.read(fr)

app = Quart(__name__)

config = load_config(environ.get("CONFIG_FILE", "config.toml"))
tz = zoneinfo.ZoneInfo(config.timezone)
utc = zoneinfo.ZoneInfo("UTC")

calendar_cache = {
    'last_updated': None,
    'busy_ical': "",
    'events_ical': ""
}

next_update_seconds = 0
cache_condition = Condition()

async def fetch_calendar_async(calendar_url):
    async with aiohttp.ClientSession() as session:
        async with session.get(calendar_url) as response:
            return await response.read()

def read_calendar_file(calendar_file):
    with open(calendar_file, 'rb') as f:
        return f.read()

async def get_calendar_async(calendar_config):
    if 'file' in calendar_config and calendar_config['file']:
        return read_calendar_file(calendar_config['file'])
    elif 'url' in calendar_config and calendar_config['url']:
        u = urllib.parse.urlsplit(calendar_config['url'])
        if u.scheme == "webcal":
            u = u._replace(scheme="https")
        return await fetch_calendar_async(urllib.parse.urlunsplit(u))
    else:
        raise ValueError("Calendar URL or file not configured.")

def fixup_date(dt):
    if type(dt) == date:
        return datetime.combine(dt, time.min, tzinfo=tz)
    elif dt.tzinfo is None:
        return dt.replace(tzinfo=tz)
    else:
        return dt.astimezone(tz)

def looks_tentative(component):
    summary = str(component.get('summary', ''))
    return component.get('status') == 'TENTATIVE' or 'TBD' in summary or summary.endswith("?")

def make_clean_event(domain, component):
    new_event = icalendar.cal.Event()

    new_event.add('SUMMARY', f"{config.name} Busy")
    new_event.add('DTSTART', component['DTSTART'])
    new_event.add('DTEND', component['DTEND'])

    # Ensure each event has a unique UID
    new_event.add('UID', f'{uuid4()}@{domain}')
    new_event.add('DTSTAMP', datetime.now(tz=utc))

    return new_event

def make_busy_period(component, dtstart, dtend, is_tentative_fn):
    vp = icalendar.prop.vPeriod([t.astimezone(utc) for t in [dtstart, dtend]])
    vp.params.pop('TZID')  # remove timezone information

    if is_tentative_fn(component):
        vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_TENTATIVE
    else:
        vp.FBTYPE = icalendar.enums.FBTYPE.BUSY_UNAVAILABLE

    return vp

def process_calendar_events(calendar, start_date, end_date):
    """
    Process events from a calendar and return both event components and busy periods.

    This function examines calendar components, filters them by date range,
    determines if they are tentative based on the provided function,

    Args:
        calendar: The input calendar object to process
        start_date: Start date for filtering events
        end_date: End date for filtering events

    Returns:
        Dictionary of vPeriod objects with FBTYPE set to BUSY_TENTATIVE or BUSY_UNAVAILABLE
    """
    events = {}

    for component in calendar.walk():
        if component.name == "VEVENT":
            dtstart = fixup_date(component.get('dtstart').dt)
            dtend = fixup_date(component.get('dtend', component.get('dtstart')).dt)
            if dtstart >= start_date and dtend <= end_date:
                period_key = (dtstart, dtend)
                events[period_key] = component

    return events

def make_calendar():
    c = Calendar()
    c.add('prodid', '-//Calendar Anonymiser//alin.ovh//')
    c.add('version', '2.0')

    return c

async def update_calendar_cache():
    """
    Update the global calendar cache with fresh data
    """
    try:
        # Prepare data outside the lock

        domain = config.email.split('@')[1]

        today = date.today()
        start_of_week = today - timedelta(days=today.weekday())
        start_date = datetime.combine(start_of_week, time.min, tzinfo=tz)
        end_date = start_date + timedelta(days=30)

        busy_periods = {}
        events = {}

        for calendar_config in config.calendars:
            try:
                if 'all_tentative' in calendar_config and calendar_config['all_tentative']:
                    tentative_fn = lambda component: True
                else:
                    tentative_fn = looks_tentative

                calendar_data = await get_calendar_async(calendar_config)
                input_calendar = Calendar.from_ical(calendar_data)

                calendar_events = process_calendar_events(input_calendar, start_date, end_date)

                for period_key, event_component in calendar_events.items():
                    if not (period_key in events and tentative_fn(events[period_key])):
                        dtstart, dtend = period_key
                        events[period_key] = make_clean_event(domain, event_component)
                        busy_periods[period_key] = make_busy_period(event_component, dtstart, dtend, tentative_fn)

            except Exception as e:
                logging.error(f"Error processing calendar {calendar_config}: {str(e)}")
                logging.error(traceback.format_exc())

        # Generate the busy/free calendar
        organizer = icalendar.prop.vCalAddress(f"mailto:{config.email}")
        organizer.name = config.name

        busy_calendar = make_calendar()

        busy = FreeBusy(uid=f'{uuid4()}@{domain}')
        busy.add('organizer', organizer)
        busy.add('dtstamp', datetime.now(tz=utc))
        busy.add('dtstart', start_date.astimezone(tz=utc))
        busy.add('dtend', end_date.astimezone(tz=utc))

        for period in busy_periods.values():
            busy.add('freebusy', period)

        busy_calendar.add_component(busy)

        events_calendar = make_calendar()

        for event_component in events.values():
            events_calendar.add_component(event_component)

        # Update the global cache
        async with cache_condition:
            calendar_cache['last_updated'] = datetime.now(tz=utc)
            calendar_cache['busy_ical'] = busy_calendar.to_ical()
            calendar_cache['events_ical'] = events_calendar.to_ical()
            # Notify all waiting tasks that the update is complete
            cache_condition.notify_all()

        logging.info(f"Calendar cache updated with {len(events)} events")
    except Exception as e:
        logging.error(f"Error updating calendar cache: {str(e)}")
        logging.error(traceback.format_exc())
        # Notify waiters even on error
        async with cache_condition:
            cache_condition.notify_all()

def seconds_until_next_day():
    """
    Calculate the number of seconds until midnight
    """
    now = datetime.now(tz)
    tomorrow = datetime.combine(now.date() + timedelta(days=1), time.min, tzinfo=tz)
    return int((tomorrow - now).total_seconds())

async def periodic_calendar_update():
    """
    Background task to periodically update the calendar cache
    """
    global next_update_seconds
    while True:
        await asyncio.sleep(1)
        next_update_seconds -= 1
        if next_update_seconds <= 0:
            await update_calendar_cache()
            schedule_next_update()

def schedule_next_update():
    global next_update_seconds
    next_update_seconds = min(config.update_interval, seconds_until_next_day())

@app.before_serving
async def startup():
    """
    Startup tasks before the server begins serving requests
    """
    logging.basicConfig(level=logging.INFO)
    await update_calendar_cache()
    schedule_next_update()
    app.add_background_task(periodic_calendar_update)

@app.route(f'/{str.lower(config.name)}.ics', defaults = {'key': 'busy_ical'})
@app.route(f'/busy/{str.lower(config.name)}.ics', defaults = {'key': 'busy_ical'})
@app.route(f'/events/{str.lower(config.name)}.ics', defaults = {'key': 'events_ical'})
async def events(key):
    try:
        async with cache_condition:
            if calendar_cache[key]:
                return calendar_cache[key], {
                    "Content-Type": "text/calendar",
                    "Cache-Control": f"max-age={next_update_seconds}",
                    "Last-Modified": calendar_cache['last_updated'].strftime("%a, %d %b %Y %H:%M:%S GMT")
                }
            else:
                return "Calendar data not yet available. Please try again shortly.", 503
    except Exception as e:
        logging.error(f"Error serving calendar: {str(e)}")
        logging.error(traceback.format_exc())
        return f"Error processing calendar: {str(e)}", 500

if __name__ == '__main__':
    app.run(debug=True)