refactor: split out importer code from searchix/web
1 file changed, 131 insertions(+), 88 deletions(-)
changed files
M internal/importer/main.go → internal/importer/main.go
@@ -3,11 +3,14 @@ import ( "context" "fmt" + "maps" + "math" "os/exec" "slices" "strings" "time" + "github.com/getsentry/sentry-go" "go.alanpearce.eu/searchix/internal/config" "go.alanpearce.eu/searchix/internal/fetcher" "go.alanpearce.eu/searchix/internal/index"@@ -18,10 +21,9 @@ "gitlab.com/tozd/go/errors" ) type Options struct { - Update bool - Replace bool - LowMemory bool - Logger *log.Logger + LowMemory bool + Logger *log.Logger + WriteIndex *index.WriteIndex } var Job struct {@@ -54,7 +56,7 @@ func createSourceImporter( parent context.Context, log *log.Logger, meta *index.Meta, - indexer *index.WriteIndex, + writeIndex *index.WriteIndex, forceUpdate bool, ) func(*config.Source) errors.E { return func(source *config.Source) errors.E {@@ -157,7 +159,7 @@ if err != nil { return errors.WithMessagef(err, "failed to create processor") } - hadWarnings, err := process(ctx, indexer, processor, logger) + hadWarnings, err := process(ctx, writeIndex, processor, logger) if err != nil { return errors.WithMessagef(err, "failed to process source") }@@ -182,18 +184,6 @@ log *log.Logger indexer *index.WriteIndex } -func New( - cfg *config.Config, - log *log.Logger, - indexer *index.WriteIndex, -) *Importer { - return &Importer{ - config: cfg, - log: log, - indexer: indexer, - } -} - func (imp *Importer) Start( ctx context.Context, forceUpdate bool,@@ -233,90 +223,143 @@ err := imp.indexer.SaveMeta() if err != nil { return errors.Wrap(err, "failed to save metadata") } + + SetLastUpdated(time.Now()) return nil } -func SetupIndex(ctx context.Context, cfg *config.Config, options *Options) errors.E { - var i uint - cfgEnabledSources := make([]string, len(cfg.Importer.Sources)) - for key := range cfg.Importer.Sources { - cfgEnabledSources[i] = key - i++ - } +func New( + cfg *config.Config, + options *Options, +) (*Importer, errors.E) { + + return &Importer{ + config: cfg, + log: options.Logger, + indexer: options.WriteIndex, + }, nil +} + +func (imp *Importer) EnsureSourcesIndexed( + ctx context.Context, + read *index.ReadIndex, + write *index.WriteIndex, +) errors.E { + cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources)) slices.Sort(cfgEnabledSources) - read, write, exists, err := index.OpenOrCreate( - cfg.DataPath, - options.Replace, - &index.Options{ - LowMemory: options.LowMemory, - Logger: options.Logger.Named("index"), - }, - ) + indexedSources, err := read.GetEnabledSources() if err != nil { - return errors.Wrap(err, "Failed to open or create index") + return errors.Wrap(err, "Failed to get enabled sources from index") } - - if !exists || options.Replace || options.Update { - options.Logger.Info( - "Starting build job", - "new", - !exists, - "replace", - options.Replace, - "update", - options.Update, - ) - imp := New(cfg, options.Logger.Named("importer"), write) - err = imp.Start( - ctx, - options.Replace || options.Update, - nil, - ) - if err != nil { - return errors.Wrap(err, "Failed to build index") + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + imp.log.Info("adding new sources", "sources", newSources) + err := imp.Start( + ctx, + false, + &newSources, + ) + if err != nil { + return errors.Wrap(err, "Failed to update index with new sources") + } } - if options.Replace || options.Update { - return nil + if len(retiredSources) > 0 { + imp.log.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := write.DeleteBySource(s) + if err != nil { + return errors.Wrapf(err, "Failed to remove retired source %s", s) + } + } } - } else { - indexedSources, err := read.GetEnabledSources() - if err != nil { - return errors.Wrap(err, "Failed to get enabled sources from index") + } + + return nil +} + +func (imp *Importer) StartUpdateTimer( + ctx context.Context, + localHub *sentry.Hub, +) { + const monitorSlug = "import" + localHub.WithScope(func(scope *sentry.Scope) { + var err errors.E + scope.SetContext("monitor", sentry.Context{"slug": monitorSlug}) + monitorConfig := &sentry.MonitorConfig{ + Schedule: sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay), + MaxRuntime: int64(math.Ceil(imp.config.Importer.Timeout.Minutes())), + CheckInMargin: 5, + Timezone: time.Local.String(), } - slices.Sort(indexedSources) - if !slices.Equal(cfgEnabledSources, indexedSources) { - newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { - return slices.Contains(indexedSources, s) - }) - retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { - return slices.Contains(cfgEnabledSources, s) - }) - if len(newSources) > 0 { - options.Logger.Info("adding new sources", "sources", newSources) - imp := New(cfg, options.Logger.Named("importer"), write) - err := imp.Start( - ctx, - false, - &newSources, - ) - if err != nil { - return errors.Wrap(err, "Failed to update index with new sources") - } + + nextRun := nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) + SetNextRun(nextRun) + for { + imp.log.Debug("scheduling next run", "next-run", nextRun) + select { + case <-ctx.Done(): + imp.log.Debug("stopping scheduler") + + return + case <-time.After(time.Until(nextRun)): } - if len(retiredSources) > 0 { - options.Logger.Info("removing retired sources", "sources", retiredSources) - for _, s := range retiredSources { - err := write.DeleteBySource(s) - if err != nil { - return errors.Wrapf(err, "Failed to remove retired source %s", s) - } - } + imp.log.Info("updating index") + + eventID := localHub.CaptureCheckIn(&sentry.CheckIn{ + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusInProgress, + }, monitorConfig) + MarkIndexingStarted() + + err = imp.Start(ctx, false, nil) + if err != nil { + imp.log.Warn("error updating index", "error", err) + + localHub.CaptureException(err) + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusError, + }, monitorConfig) + } else { + imp.log.Info("update complete") + + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusOK, + }, monitorConfig) } + nextRun = nextRun.AddDate(0, 0, 1) + MarkIndexingFinished(nextRun) } + }) +} + +func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { + now := time.Now() + nextRun := time.Date( + now.Year(), + now.Month(), + now.Day(), + dayTime.Hour, + dayTime.Minute, + dayTime.Second, + 0, + time.UTC, + ) + if nextRun.Before(now) { + return nextRun.AddDate(0, 0, 1) } - SetLastUpdated(read.LastUpdated()) - return nil + return nextRun }