feat: remove entities not present in current import
4 files changed, 124 insertions(+), 10 deletions(-)
M .golangci.yaml → .golangci.yaml
@@ -22,6 +22,10 @@ - sloglint - unconvert - wrapcheck settings: + errcheck: + verbose: true + exclude-functions: + - (github.com/asdine/storm/v3.Tx).Rollback forbidigo: forbid: - pattern: ^(file)?path.Join$
M cmd/searchix-web/ingest.go → cmd/searchix-web/ingest.go
@@ -75,16 +75,32 @@ return fault.Wrap(err, fmsg.With("Failed to create importer")) } if !exists || opts.Replace || opts.Fetch { + importer.MarkImportStarted() + err = imp.Fetch(ctx, true, opts.Fetch && !opts.Replace, nil) if err != nil { return fault.Wrap(err, fmsg.With("Failed to start importer")) } + + importer.MarkImportFinished() + importer.MarkLastRun(write.Meta) + err = write.SaveMeta() + if err != nil { + return fault.Wrap(err, fmsg.With("Failed to save index metadata")) + } } if !exists || opts.Reindex { err = imp.Index(ctx) if err != nil { return fault.Wrap(err, fmsg.With("Failed to index data")) + } + } + + if opts.Replace || opts.Reindex { + err = imp.Prune(ctx) + if err != nil { + return fault.Wrap(err, fmsg.With("Failed to prune index")) } }
M internal/importer/job.go → internal/importer/job.go
@@ -27,7 +27,7 @@ Job.LastRun.FinishedAt = time.Now() Job.StartedAt = time.Time{} } -func MarkLastFetch(meta *index.Meta) { +func MarkLastRun(meta *index.Meta) { meta.LastImport = Job.LastRun }@@ -81,8 +81,19 @@ } else { imp.options.Logger.Info("indexing complete") } + err = imp.Prune(ctx) + if err != nil { + imp.options.Logger.Warn("error pruning index", "error", err) + } + cancel() MarkImportFinished() + + MarkLastRun(imp.options.WriteIndex.Meta) + err = imp.options.WriteIndex.SaveMeta() + if err != nil { + imp.options.Logger.Error("error saving metadata", "error", err) + } nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) SetNextRun(nextRun)
M internal/importer/main.go → internal/importer/main.go
@@ -10,17 +10,19 @@ "strings" "time" "alin.ovh/x/log" + "github.com/Southclaws/fault" + "github.com/Southclaws/fault/fmsg" + "github.com/asdine/storm/v3/q" + "github.com/blevesearch/bleve/v2" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/fetcher" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" + "alin.ovh/searchix/internal/nix" "alin.ovh/searchix/internal/programs" "alin.ovh/searchix/internal/storage" - - "github.com/Southclaws/fault" - "github.com/Southclaws/fault/fmsg" ) type Options struct {@@ -86,12 +88,6 @@ imp.options.Logger.Error("import failed", "source", name, "error", err) } } - MarkLastFetch(meta) - err := imp.options.WriteIndex.SaveMeta() - if err != nil { - return fault.Wrap(err, fmsg.With("failed to save metadata")) - } - return nil }@@ -150,6 +146,93 @@ return fault.Wrap(err, fmsg.Withf("Failed to remove retired source %s", s)) } } } + } + + return nil +} + +func (imp *Importer) Prune(ctx context.Context) error { + for _, source := range imp.config.Importer.Sources { + err := imp.PruneSource(ctx, source) + if err != nil { + imp.options.Logger.Error("Failed to prune source", "source", source.Key, "error", err) + } + } + + return nil +} + +func (imp *Importer) PruneSource(ctx context.Context, source *config.Source) error { + store := imp.options.Storage + write := imp.options.WriteIndex + + tx, err := store.WithBatch(true).From(source.Key).Begin(true) + if err != nil { + return fault.Wrap(err, fmsg.With("Failed to begin transaction")) + } + defer tx.Rollback() + + cutoff := write.Meta.LastImport.StartedAt + imp.options.Logger.Debug("searching for old entities", "cutoff", cutoff.Format(time.RFC3339)) + + query := tx.Select(q.Lt("ImportedAt", cutoff)) + + var obj nix.Importable + switch source.Importer { + case config.Options: + obj = new(nix.Option) + case config.Packages: + obj = new(nix.Package) + } + count, err := query.Count(obj) + if err != nil { + return fault.Wrap(err, fmsg.With("failed to retrieve entities for pruning")) + } + + imp.options.Logger.Debug( + "pruning", + "type", + source.Importer.String(), + "source", + source.Key, + "count", + count, + ) + + if count == 0 { + return nil + } + + objs := make(chan nix.Importable, 1) + errs := write.WithBatch(ctx, objs, func(batch *bleve.Batch, obj nix.Importable) error { + batch.Delete(obj.GetName()) + + return nil + }) + + go func() { + for err := range errs { + imp.options.Logger.Error("failed to prune old entities", "error", err) + } + }() + + err = query.Each(obj, func(record any) error { + objs <- record.(nix.Importable) + + return nil + }) + if err != nil { + return fault.Wrap(err, fmsg.With("failed to prune old entities from index")) + } + + err = query.Delete(obj) + if err != nil { + return fault.Wrap(err, fmsg.With("failed to prune old entities from storage")) + } + + err = tx.Commit() + if err != nil { + return fault.Wrap(err, fmsg.With("Failed to commit transaction")) } return nil