refactor: move SetupIndex and indexing progress to importer
1 file changed, 117 insertions(+), 0 deletions(-)
changed files
M internal/importer/main.go → internal/importer/main.go
@@ -17,6 +17,39 @@ "gitlab.com/tozd/go/errors" ) +type Options struct { + Update bool + Replace bool + LowMemory bool + Logger *log.Logger +} + +var Job struct { + InProgress bool + StartedAt time.Time + FinishedAt time.Time + NextRun time.Time +} + +func SetNextRun(nextRun time.Time) { + Job.NextRun = nextRun +} + +func SetLastUpdated(last time.Time) { + Job.FinishedAt = last +} + +func MarkIndexingStarted() { + Job.StartedAt = time.Now() + Job.InProgress = true +} + +func MarkIndexingFinished(nextRun time.Time) { + Job.FinishedAt = time.Now() + Job.InProgress = false + Job.NextRun = nextRun +} + func createSourceImporter( parent context.Context, log *log.Logger,@@ -203,3 +236,87 @@ } return nil } + +func SetupIndex(ctx context.Context, cfg *config.Config, options *Options) errors.E { + var i uint + cfgEnabledSources := make([]string, len(cfg.Importer.Sources)) + for key := range cfg.Importer.Sources { + cfgEnabledSources[i] = key + i++ + } + slices.Sort(cfgEnabledSources) + + read, write, exists, err := index.OpenOrCreate( + cfg.DataPath, + options.Replace, + &index.Options{ + LowMemory: options.LowMemory, + Logger: options.Logger.Named("index"), + }, + ) + if err != nil { + return errors.Wrap(err, "Failed to open or create index") + } + + if !exists || options.Replace || options.Update { + options.Logger.Info( + "Starting build job", + "new", + !exists, + "replace", + options.Replace, + "update", + options.Update, + ) + imp := New(cfg, options.Logger.Named("importer"), write) + err = imp.Start( + ctx, + options.Replace || options.Update, + nil, + ) + if err != nil { + return errors.Wrap(err, "Failed to build index") + } + if options.Replace || options.Update { + return nil + } + } else { + indexedSources, err := read.GetEnabledSources() + if err != nil { + return errors.Wrap(err, "Failed to get enabled sources from index") + } + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + options.Logger.Info("adding new sources", "sources", newSources) + imp := New(cfg, options.Logger.Named("importer"), write) + err := imp.Start( + ctx, + false, + &newSources, + ) + if err != nil { + return errors.Wrap(err, "Failed to update index with new sources") + } + } + if len(retiredSources) > 0 { + options.Logger.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := write.DeleteBySource(s) + if err != nil { + return errors.Wrapf(err, "Failed to remove retired source %s", s) + } + } + } + } + } + SetLastUpdated(read.LastUpdated()) + + return nil +}