refactor: re-arrange importer package
4 files changed, 277 insertions(+), 268 deletions(-)
M internal/importer/importer.go → internal/importer/importer.go
@@ -2,9 +2,6 @@ package importer import ( "context" - "sync" - - "alin.ovh/x/log" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/nix"@@ -19,53 +16,6 @@ type ( ImportSource func(context.Context) (<-chan nix.Importable, <-chan error) ImportDestination func(context.Context, <-chan nix.Importable) <-chan error ) - -func pipe( - ctx context.Context, - log *log.Logger, - src ImportSource, - dst ImportDestination, -) (bool, error) { - wg := sync.WaitGroup{} - - wg.Add(1) - objects, srcErrs := src(ctx) - - wg.Add(1) - dstErrors := dst(ctx, objects) - - var hadObjectErrors bool - var criticalError error - go func() { - for { - select { - case err, running := <-srcErrs: - if !running { - wg.Done() - srcErrs = nil - log.Debug("processing completed") - - continue - } - hadObjectErrors = true - log.Warn("error processing object from source", "error", err) - case err, running := <-dstErrors: - if !running { - wg.Done() - dstErrors = nil - - continue - } - hadObjectErrors = true - log.Warn("error writing object to target", "error", err) - } - } - }() - - wg.Wait() - - return hadObjectErrors, criticalError -} func (imp *Importer) indexSource( ctx context.Context,
A internal/importer/job.go
@@ -0,0 +1,115 @@ +package importer + +import ( + "context" + "time" + + "alin.ovh/searchix/internal/config" + "alin.ovh/searchix/internal/index" +) + +var Job struct { + StartedAt time.Time + LastRun struct { + StartedAt time.Time + FinishedAt time.Time + } + NextRun time.Time +} + +func MarkImportStarted() { + Job.StartedAt = time.Now() +} + +func MarkImportFinished() { + Job.LastRun.StartedAt = Job.StartedAt + Job.LastRun.FinishedAt = time.Now() + Job.StartedAt = time.Time{} +} + +func MarkLastFetch(meta *index.Meta) { + meta.LastImport = Job.LastRun +} + +func SetNextRun(nextRun time.Time) { + Job.NextRun = nextRun +} + +func (imp *Importer) StartUpdateTimer( + parentCtx context.Context, +) { + var nextRun time.Time + switch { + case Job.LastRun.FinishedAt.Before(time.Now().Add(-24 * time.Hour)): + imp.options.Logger.Info( + "indexing last ran more than 24 hours ago, scheduling immediate update", + ) + nextRun = time.Now() + case imp.options.WriteIndex.Meta.IsSchemaOutdated(): + imp.options.Logger.Info( + "indexing schema version is out of date, scheduling immediate update", + ) + nextRun = time.Now() + default: + nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) + } + SetNextRun(nextRun) + for { + select { + case <-parentCtx.Done(): + imp.options.Logger.Debug("stopping scheduler") + + return + case <-time.After(time.Until(nextRun)): + } + imp.options.Logger.Info("updating index") + + MarkImportStarted() + ctx, cancel := context.WithTimeout(parentCtx, imp.config.Importer.Timeout.Duration) + + err := imp.Fetch(ctx, false, false, nil) + if err != nil { + imp.options.Logger.Warn("error fetching update", "error", err) + } else { + imp.options.Logger.Info("update complete") + } + + err = imp.Index(ctx) + if err != nil { + imp.options.Logger.Warn("error indexing update", "error", err) + } else { + imp.options.Logger.Info("indexing complete") + } + + cancel() + MarkImportFinished() + + nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) + SetNextRun(nextRun) + + imp.options.Logger.Info( + "scheduling next run", + "next-run", + nextRun.Format(time.DateTime), + ) + } +} + +func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { + now := time.Now() + nextRun := time.Date( + now.Year(), + now.Month(), + now.Day(), + dayTime.Hour, + dayTime.Minute, + dayTime.Second, + 0, + time.UTC, + ) + if nextRun.Before(now) { + return nextRun.AddDate(0, 0, 1) + } + + return nextRun +}
M internal/importer/main.go → internal/importer/main.go
@@ -33,31 +33,126 @@ Root *file.Root Storage *storage.Store } -var Job struct { - StartedAt time.Time - LastRun struct { - StartedAt time.Time - FinishedAt time.Time - } - NextRun time.Time +type Importer struct { + config *config.Config + options *Options } -func MarkImportStarted() { - Job.StartedAt = time.Now() +func New( + cfg *config.Config, + options *Options, +) (*Importer, error) { + Job.LastRun = options.WriteIndex.Meta.LastImport + + return &Importer{ + config: cfg, + options: options, + }, nil } -func MarkImportFinished() { - Job.LastRun.StartedAt = Job.StartedAt - Job.LastRun.FinishedAt = time.Now() - Job.StartedAt = time.Time{} +func (imp *Importer) Fetch( + ctx context.Context, + forceUpdate bool, + fetchOnly bool, + onlyUpdateSources *[]string, +) error { + if len(imp.config.Importer.Sources) == 0 { + imp.options.Logger.Info("No sources enabled") + + return nil + } + + imp.options.Logger.Debug("starting importer", "timeout", imp.config.Importer.Timeout.Duration) + importCtx, cancelImport := context.WithTimeout( + ctx, + imp.config.Importer.Timeout.Duration, + ) + defer cancelImport() + + forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) + + meta := imp.options.WriteIndex.Meta + + importSource := imp.createSourceImporter(importCtx, meta, forceUpdate, fetchOnly) + for name, source := range imp.config.Importer.Sources { + if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { + if !slices.Contains(*onlyUpdateSources, name) { + continue + } + } + err := importSource(source) + if err != nil { + imp.options.Logger.Error("import failed", "source", name, "error", err) + } + } + + MarkLastFetch(meta) + err := imp.options.WriteIndex.SaveMeta() + if err != nil { + return fault.Wrap(err, fmsg.With("failed to save metadata")) + } + + return nil } -func MarkLastFetch(meta *index.Meta) { - meta.LastImport = Job.LastRun +func (imp *Importer) Index(ctx context.Context) error { + for name, source := range imp.config.Importer.Sources { + hadErrors, err := imp.indexSource(ctx, source) + if err != nil { + return fault.Wrap(err, fmsg.Withf("Failed to import source %s", name)) + } + + if hadErrors { + imp.options.Logger.Warn("Imported source encountered errors", "source", source.Name) + } + } + + return nil } -func SetNextRun(nextRun time.Time) { - Job.NextRun = nextRun +func (imp *Importer) EnsureSourcesIndexed( + ctx context.Context, + read *index.ReadIndex, +) error { + cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources)) + slices.Sort(cfgEnabledSources) + + indexedSources, err := read.GetEnabledSources() + if err != nil { + return fault.Wrap(err, fmsg.With("Failed to get enabled sources from index")) + } + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + imp.options.Logger.Info("adding new sources", "sources", newSources) + err := imp.Fetch( + ctx, + false, + false, + &newSources, + ) + if err != nil { + return fault.Wrap(err, fmsg.With("Failed to update index with new sources")) + } + } + if len(retiredSources) > 0 { + imp.options.Logger.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := imp.options.WriteIndex.DeleteBySource(s) + if err != nil { + return fault.Wrap(err, fmsg.Withf("Failed to remove retired source %s", s)) + } + } + } + } + + return nil } func (imp *Importer) createSourceImporter(@@ -215,204 +310,3 @@ return nil } } - -type Importer struct { - config *config.Config - options *Options -} - -func (imp *Importer) Fetch( - ctx context.Context, - forceUpdate bool, - fetchOnly bool, - onlyUpdateSources *[]string, -) error { - if len(imp.config.Importer.Sources) == 0 { - imp.options.Logger.Info("No sources enabled") - - return nil - } - - imp.options.Logger.Debug("starting importer", "timeout", imp.config.Importer.Timeout.Duration) - importCtx, cancelImport := context.WithTimeout( - ctx, - imp.config.Importer.Timeout.Duration, - ) - defer cancelImport() - - forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) - - meta := imp.options.WriteIndex.Meta - - importSource := imp.createSourceImporter(importCtx, meta, forceUpdate, fetchOnly) - for name, source := range imp.config.Importer.Sources { - if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { - if !slices.Contains(*onlyUpdateSources, name) { - continue - } - } - err := importSource(source) - if err != nil { - imp.options.Logger.Error("import failed", "source", name, "error", err) - } - } - - MarkLastFetch(meta) - err := imp.options.WriteIndex.SaveMeta() - if err != nil { - return fault.Wrap(err, fmsg.With("failed to save metadata")) - } - - return nil -} - -func (imp *Importer) Index(ctx context.Context) error { - for name, source := range imp.config.Importer.Sources { - hadErrors, err := imp.indexSource(ctx, source) - if err != nil { - return fault.Wrap(err, fmsg.Withf("Failed to import source %s", name)) - } - - if hadErrors { - imp.options.Logger.Warn("Imported source encountered errors", "source", source.Name) - } - } - - return nil -} - -func New( - cfg *config.Config, - options *Options, -) (*Importer, error) { - Job.LastRun = options.WriteIndex.Meta.LastImport - - return &Importer{ - config: cfg, - options: options, - }, nil -} - -func (imp *Importer) EnsureSourcesIndexed( - ctx context.Context, - read *index.ReadIndex, -) error { - cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources)) - slices.Sort(cfgEnabledSources) - - indexedSources, err := read.GetEnabledSources() - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to get enabled sources from index")) - } - slices.Sort(indexedSources) - if !slices.Equal(cfgEnabledSources, indexedSources) { - newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { - return slices.Contains(indexedSources, s) - }) - retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { - return slices.Contains(cfgEnabledSources, s) - }) - if len(newSources) > 0 { - imp.options.Logger.Info("adding new sources", "sources", newSources) - err := imp.Fetch( - ctx, - false, - false, - &newSources, - ) - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to update index with new sources")) - } - } - if len(retiredSources) > 0 { - imp.options.Logger.Info("removing retired sources", "sources", retiredSources) - for _, s := range retiredSources { - err := imp.options.WriteIndex.DeleteBySource(s) - if err != nil { - return fault.Wrap(err, fmsg.Withf("Failed to remove retired source %s", s)) - } - } - } - } - - return nil -} - -func (imp *Importer) StartUpdateTimer( - parentCtx context.Context, -) { - var nextRun time.Time - switch { - case Job.LastRun.FinishedAt.Before(time.Now().Add(-24 * time.Hour)): - imp.options.Logger.Info( - "indexing last ran more than 24 hours ago, scheduling immediate update", - ) - nextRun = time.Now() - case imp.options.WriteIndex.Meta.IsSchemaOutdated(): - imp.options.Logger.Info( - "indexing schema version is out of date, scheduling immediate update", - ) - nextRun = time.Now() - default: - nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) - } - SetNextRun(nextRun) - for { - select { - case <-parentCtx.Done(): - imp.options.Logger.Debug("stopping scheduler") - - return - case <-time.After(time.Until(nextRun)): - } - imp.options.Logger.Info("updating index") - - MarkImportStarted() - ctx, cancel := context.WithTimeout(parentCtx, imp.config.Importer.Timeout.Duration) - - err := imp.Fetch(ctx, false, false, nil) - if err != nil { - imp.options.Logger.Warn("error fetching update", "error", err) - } else { - imp.options.Logger.Info("update complete") - } - - err = imp.Index(ctx) - if err != nil { - imp.options.Logger.Warn("error indexing update", "error", err) - } else { - imp.options.Logger.Info("indexing complete") - } - - cancel() - MarkImportFinished() - - nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) - SetNextRun(nextRun) - - imp.options.Logger.Info( - "scheduling next run", - "next-run", - nextRun.Format(time.DateTime), - ) - } -} - -func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { - now := time.Now() - nextRun := time.Date( - now.Year(), - now.Month(), - now.Day(), - dayTime.Hour, - dayTime.Minute, - dayTime.Second, - 0, - time.UTC, - ) - if nextRun.Before(now) { - return nextRun.AddDate(0, 0, 1) - } - - return nextRun -}
M internal/importer/utils.go → internal/importer/utils.go
@@ -1,13 +1,16 @@ package importer import ( + "context" "fmt" "io" "strings" + "sync" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/nix" + "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" "github.com/bcicen/jstream"@@ -61,3 +64,50 @@ } return nil } + +func pipe( + ctx context.Context, + log *log.Logger, + src ImportSource, + dst ImportDestination, +) (bool, error) { + wg := sync.WaitGroup{} + + wg.Add(1) + objects, srcErrs := src(ctx) + + wg.Add(1) + dstErrors := dst(ctx, objects) + + var hadObjectErrors bool + var criticalError error + go func() { + for { + select { + case err, running := <-srcErrs: + if !running { + wg.Done() + srcErrs = nil + log.Debug("processing completed") + + continue + } + hadObjectErrors = true + log.Warn("error processing object from source", "error", err) + case err, running := <-dstErrors: + if !running { + wg.Done() + dstErrors = nil + + continue + } + hadObjectErrors = true + log.Warn("error writing object to target", "error", err) + } + } + }() + + wg.Wait() + + return hadObjectErrors, criticalError +}