feat: split storage import and indexing
8 files changed, 259 insertions(+), 132 deletions(-)
M cmd/searchix-web/main.go → cmd/searchix-web/main.go
@@ -34,8 +34,9 @@ "print default configuration and exit", ) generateErrorPage = flag.Bool("generate-error-page", false, "generate error page and exit") dev = flag.Bool("dev", false, "enable live reloading and nicer logging") - replace = flag.Bool("replace", false, "replace existing index and exit") prefetch = flag.Bool("fetch", false, "pre-fetch data and exit") + replace = flag.Bool("replace", false, "replace existing storage and exit") + reindex = flag.Bool("reindex", false, "reindex existing index and exit") offline = flag.Bool("offline", false, "run in offline mode") version = flag.Bool("version", false, "print version information") cpuprofile = flag.String("cpuprofile", "", "enable CPU profiling and save to `file`")@@ -121,7 +122,7 @@ read, write, exists, err := index.OpenOrCreate( &index.Options{ Config: cfg, - Force: *replace, + Force: *reindex, LowMemory: cfg.Importer.LowMemory, BatchSize: cfg.Importer.BatchSize, Logger: logger.Named("index"),@@ -167,6 +168,29 @@ logger.Fatal("Failed to start importer", "error", err) } if *replace || *prefetch { + return + } + } + + if !exists || *reindex { + for _, source := range cfg.Importer.Sources { + hadErrors, err := importer.ImportSource( + ctx, + logger.Named("importer"), + store, + source, + write, + ) + if err != nil { + logger.Fatal("Failed to import source", "source", source.Name, "error", err) + } + + if hadErrors { + logger.Warn("Imported source encountered errors", "source", source.Name) + } + } + + if *reindex { return } }
M internal/importer/importer.go → internal/importer/importer.go
@@ -7,70 +7,52 @@ "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/nix" + "alin.ovh/searchix/internal/storage" + "alin.ovh/x/log" ) type Processor interface { Process(context.Context) (<-chan nix.Importable, <-chan error) } -func (imp *Importer) process( +func Import[I nix.Importable]( ctx context.Context, - processor Processor, - source *config.Source, + log *log.Logger, + src func(context.Context) (<-chan I, <-chan error), + dst func(context.Context, <-chan I) <-chan error, ) (bool, error) { wg := sync.WaitGroup{} wg.Add(1) - objects, pErrs := processor.Process(ctx) - - d1, d2 := duplicate(objects) - - wg.Add(1) - iErrs := imp.options.WriteIndex.Import(ctx, d1) + objects, srcErrs := src(ctx) wg.Add(1) - wErrs := imp.options.Storage.Import(ctx, source, d2) + dstErrors := dst(ctx, objects) var hadObjectErrors bool var criticalError error go func() { for { select { - case err, running := <-iErrs: + case err, running := <-srcErrs: if !running { wg.Done() - iErrs = nil - imp.options.Logger.Debug("ingest completed") + srcErrs = nil + log.Debug("processing completed") continue } - be, isBatchError := err.(*index.BatchError) - if isBatchError { - criticalError = be - - break - } hadObjectErrors = true - imp.options.Logger.Warn("error ingesting object", "error", err) - case err, running := <-pErrs: + log.Warn("error processing object from source", "error", err) + case err, running := <-dstErrors: if !running { wg.Done() - pErrs = nil - imp.options.Logger.Debug("processing completed") + dstErrors = nil continue } hadObjectErrors = true - imp.options.Logger.Warn("error processing object", "error", err) - case err, running := <-wErrs: - if !running { - wg.Done() - wErrs = nil - - continue - } - hadObjectErrors = true - imp.options.Logger.Warn("error writing to storage", "error", err) + log.Warn("error writing object to target", "error", err) } } }()@@ -80,22 +62,37 @@ return hadObjectErrors, criticalError } -func duplicate[T any](v <-chan T) (<-chan T, <-chan T) { - if v == nil { - return nil, nil +func ImportSource( + ctx context.Context, + logger *log.Logger, + store *storage.Store, + source *config.Source, + write *index.WriteIndex, +) (bool, error) { + var it func(context.Context, *log.Logger, *storage.Store, *config.Source, *index.WriteIndex) (bool, error) + switch source.Importer { + case config.Packages: + it = ImportWithType[nix.Package] + case config.Options: + it = ImportWithType[nix.Option] } - dup1 := make(chan T, 1) - dup2 := make(chan T, 1) - - go func() { - for v := range v { - dup1 <- v - dup2 <- v - } - close(dup1) - close(dup2) - }() + return it(ctx, logger, store, source, write) +} - return dup1, dup2 +func ImportWithType[I nix.Importable]( + ctx context.Context, + logger *log.Logger, + store *storage.Store, + source *config.Source, + write *index.WriteIndex, +) (bool, error) { + return Import( + ctx, + logger.Named("importer"), + storage.MakeSourceExporter[I](store, source), + func(ctx context.Context, objects <-chan I) <-chan error { + return write.Import(ctx, nix.ToGenericChannel(objects)) + }, + ) }
M internal/importer/main.go → internal/importer/main.go
@@ -41,14 +41,17 @@ } NextRun time.Time } -func MarkIndexingStarted() { +func MarkImportStarted() { Job.StartedAt = time.Now() } -func MarkIndexingFinished(meta *index.Meta) { +func MarkImportFinished() { Job.LastRun.StartedAt = Job.StartedAt Job.LastRun.FinishedAt = time.Now() Job.StartedAt = time.Time{} +} + +func MarkLastFetch(meta *index.Meta) { meta.LastImport = Job.LastRun }@@ -150,7 +153,7 @@ } } if !fetchOnly && - (!sourceMeta.UpdatedAt.After(sourceMeta.ImportedAt) || sourceMeta.ImportedAt.IsZero() || forceUpdate) { + (!sourceMeta.UpdatedAt.After(sourceMeta.StoredAt) || sourceMeta.StoredAt.IsZero() || forceUpdate) { if files.Revision != nil { err = setRepoRevision(files.Revision, source)@@ -186,12 +189,17 @@ if err != nil { return fault.Wrap(err, fmsg.Withf("failed to create processor")) } - hadWarnings, err := imp.process(ctx, processor, source) + hadWarnings, err := Import( + ctx, + logger.Named("importer"), + processor.Process, + imp.options.Storage.MakeSourceImporter(source), + ) if err != nil { return fault.Wrap(err, fmsg.Withf("failed to process source")) } - sourceMeta.ImportedAt = time.Now() + sourceMeta.StoredAt = time.Now() if hadWarnings { logger.Warn("importer succeeded, but with warnings/errors")@@ -234,7 +242,6 @@ forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) meta := imp.options.WriteIndex.Meta - MarkIndexingStarted() importSource := imp.createSourceImporter(importCtx, meta, forceUpdate, fetchOnly) for name, source := range imp.config.Importer.Sources {@@ -249,8 +256,7 @@ imp.options.Logger.Error("import failed", "source", name, "error", err) } } - MarkIndexingFinished(meta) - + MarkLastFetch(meta) err := imp.options.WriteIndex.SaveMeta() if err != nil { return fault.Wrap(err, fmsg.With("failed to save metadata"))@@ -345,15 +351,19 @@ case <-time.After(time.Until(nextRun)): } imp.options.Logger.Info("updating index") + MarkImportStarted() ctx, cancel := context.WithTimeout(parentCtx, imp.config.Importer.Timeout.Duration) + err := imp.Start(ctx, false, false, nil) - cancel() - if err != nil { - imp.options.Logger.Warn("error updating index", "error", err) + imp.options.Logger.Warn("error fetching update", "error", err) } else { imp.options.Logger.Info("update complete") } + + cancel() + MarkImportFinished() + nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) SetNextRun(nextRun)
M internal/importer/main_test.go → internal/importer/main_test.go
@@ -1,13 +1,13 @@ package importer import ( - "context" "testing" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" + "alin.ovh/searchix/internal/storage" "alin.ovh/x/log" )@@ -21,6 +21,7 @@ } logger := log.Configure(false) _, write, _, err := index.OpenOrCreate(&index.Options{ + Config: &cfg, Force: false, LowMemory: true, BatchSize: cfg.Importer.BatchSize,@@ -31,6 +32,14 @@ if err != nil { b.Fatal(err) } + store, err := storage.New(&storage.Options{ + Root: tmp, + Logger: logger.Named("storage"), + }) + if err != nil { + b.Fatal(err) + } + imp, err := New(&cfg, &Options{ Logger: logger.Named("importer"), LowMemory: true,@@ -47,13 +56,30 @@ if err != nil { b.Fatal(err) } + source := cfg.Importer.Sources["nixpkgs"] + source.Programs.Enable = false + err = imp.Start( - context.Background(), + b.Context(), false, false, - &[]string{"nixpkgs"}, + &[]string{source.Key}, ) if err != nil { b.Fatal(err) + } + + hadErrors, err := ImportSource( + b.Context(), + logger.Named("importer"), + store, + source, + write, + ) + if err != nil { + b.Fatal(err) + } + if hadErrors { + b.Fatal("had errors") } }
M internal/index/index_meta.go → internal/index/index_meta.go
@@ -5,19 +5,20 @@ "encoding/json" "time" "alin.ovh/searchix/internal/file" - "alin.ovh/x/log" + "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" ) -const CurrentSchemaVersion = 7 +const CurrentSchemaVersion = 8 type SourceMeta struct { - ImportedAt time.Time - UpdatedAt time.Time - Path string - Rev string + IndexedAt time.Time + StoredAt time.Time + UpdatedAt time.Time + Path string + Rev string } type data struct {@@ -114,8 +115,8 @@ func (i *Meta) LastImported() time.Time { var last time.Time for _, sourceMeta := range i.Sources { - if sourceMeta.ImportedAt.After(last) { - last = sourceMeta.ImportedAt + if sourceMeta.StoredAt.After(last) { + last = sourceMeta.StoredAt } }
M internal/nix/importable.go → internal/nix/importable.go
@@ -20,6 +20,18 @@ func MakeKey(source *config.Source, id string) string { return source.Importer.Singular() + "/" + source.Key + "/" + id } +func ToGenericChannel[I Importable](in <-chan I) <-chan Importable { + out := make(chan Importable) + go func() { + for i := range in { + out <- i + } + close(out) + }() + + return out +} + func init() { gob.Register(Option{}) gob.Register(Package{})
M internal/storage/store.go → internal/storage/store.go
@@ -5,17 +5,17 @@ "context" "errors" "time" - "alin.ovh/searchix/internal/config" - "alin.ovh/searchix/internal/file" - "alin.ovh/searchix/internal/nix" "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" "github.com/asdine/storm/v3" "github.com/asdine/storm/v3/codec/gob" + "go.etcd.io/bbolt" "go.uber.org/zap" - "go.etcd.io/bbolt" + "alin.ovh/searchix/internal/config" + "alin.ovh/searchix/internal/file" + "alin.ovh/searchix/internal/nix" ) var BatchSize = 10000@@ -26,7 +26,7 @@ Logger *log.Logger } type Store struct { - db *storm.DB + *storm.DB log *log.Logger }@@ -49,13 +49,13 @@ return nil, fault.Wrap(err, fmsg.With("failed to open database")) } return &Store{ - db: bb, + DB: bb, log: opts.Logger, }, nil } func (s *Store) Close() error { - err := s.db.Close() + err := s.DB.Close() if err != nil { return fault.Wrap(err, fmsg.With("failed to close database")) }@@ -63,71 +63,79 @@ return nil } -func (s *Store) Import( - ctx context.Context, +func (s *Store) MakeSourceImporter( source *config.Source, - objects <-chan nix.Importable, -) <-chan error { - errs := make(chan error) - node := s.db.From(source.Key).WithBatch(true) +) func(context.Context, <-chan nix.Importable) <-chan error { + return func(ctx context.Context, objects <-chan nix.Importable) <-chan error { + errs := make(chan error) + node := s.DB.From(source.Key).WithBatch(true) - i := 0 + i := 0 - var save func(storm.Node, nix.Importable) error - switch source.Importer { - case config.Packages: - save = saveGen[nix.Package] - case config.Options: - save = saveGen[nix.Option] - default: - errs <- fault.New("invalid importer") + var save func(storm.Node, nix.Importable) error + switch source.Importer { + case config.Packages: + save = saveGen[nix.Package] + case config.Options: + save = saveGen[nix.Option] + default: + errs <- fault.New("invalid importer") - return errs - } + return errs + } - go func() { - defer close(errs) - tx, err := node.Begin(true) - if err != nil { - errs <- fault.Wrap(err, fmsg.With("failed to begin transaction")) + go func() { + defer close(errs) + tx, err := node.Begin(true) + if err != nil { + errs <- fault.Wrap(err, fmsg.With("failed to begin transaction")) - return - } - defer func() { - if err := tx.Rollback(); err != nil { - if !errors.Is(err, storm.ErrNotInTransaction) { - errs <- fault.Wrap(err, fmsg.With("failed to rollback transaction")) + return + } + defer func() { + if err := tx.Rollback(); err != nil { + if !errors.Is(err, storm.ErrNotInTransaction) { + errs <- fault.Wrap(err, fmsg.With("failed to rollback transaction")) + } } - } - }() + }() - outer: - for obj := range objects { - i++ - select { - case <-ctx.Done(): - s.log.Warn("import aborted") + outer: + for obj := range objects { + i++ + select { + case <-ctx.Done(): + s.log.Warn("import aborted") - break outer - default: - } + break outer + default: + } + + err := save(tx, obj) + if err != nil { + errs <- fault.Wrap(err, fmsg.With("failed to save object")) + } - err := save(tx, obj) - if err != nil { - errs <- fault.Wrap(err, fmsg.With("failed to save object")) + if i%BatchSize == 0 { + s.log.Info("imported", "count", i) + err := tx.Commit() + if err != nil { + errs <- fault.Wrap(err, fmsg.With("failed to commit transaction")) + } + tx, err = node.Begin(true) + if err != nil { + errs <- fault.Wrap(err, fmsg.With("failed to begin transaction")) + } + } } - if i%BatchSize == 0 { - s.log.Info("imported", "count", i) + if err := tx.Commit(); err != nil { + errs <- fault.Wrap(err, fmsg.With("failed to commit transaction")) } - } + }() - if err := tx.Commit(); err != nil { - errs <- fault.Wrap(err, fmsg.With("failed to commit transaction")) - } - }() - - return errs + return errs + } } func saveGen[T nix.Importable](node storm.Node, obj nix.Importable) error {@@ -150,7 +158,7 @@ ) (nix.Importable, error) { var doc nix.Importable var err error - node := s.db.From(source.Key) + node := s.From(source.Key) switch source.Importer { case config.Packages:@@ -172,3 +180,49 @@ } return doc, nil } + +func MakeSourceExporter[I nix.Importable]( + store *Store, + source *config.Source, +) func(context.Context) (<-chan I, <-chan error) { + return func(_ context.Context) (<-chan I, <-chan error) { + results := make(chan I) + errs := make(chan error) + + go func() { + defer close(results) + defer close(errs) + + var obj I + objs := make([]I, 0, BatchSize) + node := store.From(source.Key) + count, err := node.Count(&obj) + if err != nil { + errs <- fault.Wrap( + err, + fmsg.Withf("failed to count documents source: %s", source.Key), + ) + + return + } + + limit := min(BatchSize, count) + for offset := 0; offset < count; offset += BatchSize { + err := node.All(&objs, storm.Skip(offset), storm.Limit(limit)) + if err != nil { + errs <- fault.Wrap( + err, + fmsg.Withf("failed to export documents source: %s offset: %d limit: %d", source.Key, offset, limit), + ) + + return + } + for _, obj := range objs { + results <- obj + } + } + }() + + return results, errs + } +}
M justfile → justfile
@@ -70,5 +70,8 @@ fetch *flags: wgo run --exit ./cmd/searchix-web --config config.toml --fetch --dev {{ flags }} +replace *flags: + wgo run --exit ./cmd/searchix-web --config config.toml --replace --dev {{ flags }} + reindex *flags: - wgo run --exit ./cmd/searchix-web --config config.toml --replace --dev {{ flags }} + wgo run --exit ./cmd/searchix-web --config config.toml --reindex --dev {{ flags }}