feat: revert back to using index as storage
18 files changed, 359 insertions(+), 411 deletions(-)
changed files
- cmd/searchix-web/ingest.go
- cmd/searchix-web/serve.go
- go.mod
- internal/components/combined.go
- internal/components/detail.go
- internal/components/options.go
- internal/components/packages.go
- internal/importer/importer.go
- internal/importer/job.go
- internal/importer/main.go
- internal/importer/main_test.go
- internal/importer/utils.go
- internal/index/indexer.go
- internal/index/search.go
- internal/index/search_test.go
- internal/server/mux.go
- internal/server/server.go
- justfile
M cmd/searchix-web/ingest.go → cmd/searchix-web/ingest.go
@@ -12,14 +12,12 @@ "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/importer" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" - "alin.ovh/searchix/internal/storage" ) type IngestOptions struct { Fetch bool `long:"fetch" description:"pre-fetch data"` Offline bool `long:"offline" description:"offline mode"` - Replace bool `long:"replace" description:"replace existing storage"` - Reindex bool `long:"reindex" description:"reindex existing index"` + Replace bool `long:"replace" description:"replace existing index"` } func (opts *IngestOptions) Execute(_ []string) error {@@ -32,25 +30,14 @@ return fault.Wrap(err, fmsg.With("Failed to open data root")) } defer root.Close() - store, err := storage.New(&storage.Options{ - LowMemory: cfg.Importer.LowMemory, - Root: root, - Logger: logger.Named("store"), - }) - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to create store")) - } - defer store.Close() - - _, write, err := index.OpenOrCreate( + read, write, err := index.OpenOrCreate( &index.Options{ Config: cfg, - Force: opts.Reindex, + Force: opts.Replace, LowMemory: cfg.Importer.LowMemory, BatchSize: cfg.Importer.BatchSize, Logger: logger.Named("index"), Root: root, - Store: store, }, ) if err != nil {@@ -63,7 +50,7 @@ Root: root, }) imp, err := importer.New(cfg, &importer.Options{ - Storage: store, + ReadIndex: read, WriteIndex: write, LowMemory: cfg.Importer.LowMemory, Logger: logger.Named("importer"),@@ -75,11 +62,10 @@ if err != nil { return fault.Wrap(err, fmsg.With("Failed to create importer")) } - updateStore := store.IsNew() || opts.Replace || opts.Fetch - if updateStore { + if !opts.Offline && (!write.Exists() || opts.Fetch || opts.Replace) { importer.MarkImportStarted() - err = imp.Fetch(ctx, true, opts.Fetch && !opts.Replace, nil) + err = imp.Fetch(ctx, true, nil) if err != nil { return fault.Wrap(err, fmsg.With("Failed to start importer")) }@@ -92,17 +78,10 @@ return fault.Wrap(err, fmsg.With("Failed to save index metadata")) } } - if !write.Exists() || opts.Reindex || updateStore { - err = imp.Index(ctx) + if !write.Exists() || opts.Replace { + err = imp.Index(ctx, nil) if err != nil { return fault.Wrap(err, fmsg.With("Failed to index data")) - } - } - - if opts.Replace || opts.Reindex { - err = imp.Prune(ctx) - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to prune index")) } }
M cmd/searchix-web/serve.go → cmd/searchix-web/serve.go
@@ -20,7 +20,6 @@ "alin.ovh/searchix/internal/importer" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" "alin.ovh/searchix/internal/server" - "alin.ovh/searchix/internal/storage" "alin.ovh/searchix/web" )@@ -38,16 +37,6 @@ return fault.Wrap(err, fmsg.With("Failed to open data root")) } defer root.Close() - store, err := storage.New(&storage.Options{ - LowMemory: cfg.Importer.LowMemory, - Root: root, - Logger: logger.Named("store"), - }) - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to create store")) - } - defer store.Close() - read, write, err := index.OpenOrCreate( &index.Options{ Config: cfg,@@ -55,7 +44,6 @@ LowMemory: cfg.Importer.LowMemory, BatchSize: cfg.Importer.BatchSize, Logger: logger.Named("index"), Root: root, - Store: store, }, ) if err != nil {@@ -70,14 +58,13 @@ s, err := web.New(cfg, logger, &server.Options{ ReadIndex: read, ManpagesURLMap: mdb, - Store: store, }) if err != nil { return fault.Wrap(err, fmsg.With("Failed to initialise searchix-web")) } imp, err := importer.New(cfg, &importer.Options{ - Storage: store, + ReadIndex: read, WriteIndex: write, LowMemory: cfg.Importer.LowMemory, Logger: logger.Named("importer"),@@ -91,20 +78,8 @@ ctx, stop := signal.NotifyContext(context.Background(), signals...) defer stop() - if store.IsNew() { - err = sdnotify.Status("fetching") - if err != nil { - logger.Warn("failed to update systemd status", "error", err) - } - - err = imp.Fetch(ctx, true, false, nil) - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to start importer")) - } - } - - if store.IsNew() || !write.Exists() { - err = imp.Index(ctx) + if !write.Exists() { + err = imp.Index(ctx, nil) if err != nil { return fault.Wrap(err, fmsg.With("Failed to index data")) }@@ -143,7 +118,7 @@ } if sig == syscall.SIGUSR1 { logger.Info("manual fetch on SIGUSR1") - err := imp.Fetch(ctx, true, false, nil) + err := imp.Fetch(ctx, true, nil) if err != nil { logger.Warn("manual fetch error", "error", err) }@@ -151,7 +126,7 @@ logger.Info("manual fetch succeeded") } logger.Info("manual re-index", "signal", sig.String()) - err = imp.Index(ctx) + err = imp.Index(ctx, nil) if err != nil { logger.Error("manual index error", "error", err) }
M go.mod → go.mod
@@ -10,6 +10,7 @@ github.com/andybalholm/brotli v1.1.1 github.com/asdine/storm/v3 v3.2.1 github.com/bcicen/jstream v1.0.1 github.com/blevesearch/bleve/v2 v2.5.2 + github.com/blevesearch/bleve_index_api v1.2.8 github.com/creasty/defaults v1.8.0 github.com/crewjam/csp v0.0.2 github.com/dustin/go-humanize v1.0.1@@ -34,7 +35,6 @@ require ( github.com/Code-Hex/dd v1.1.0 // indirect github.com/RoaringBitmap/roaring/v2 v2.5.0 // indirect github.com/bits-and-blooms/bitset v1.22.0 // indirect - github.com/blevesearch/bleve_index_api v1.2.8 // indirect github.com/blevesearch/geo v0.2.3 // indirect github.com/blevesearch/go-faiss v1.0.25 // indirect github.com/blevesearch/go-porterstemmer v1.0.3 // indirect
M internal/components/combined.go → internal/components/combined.go
@@ -11,9 +11,9 @@ ) func CombinedData(data nix.Importable) g.Node { switch data := data.(type) { - case *nix.Option: + case nix.Option: return firstSentence(data.Description) - case *nix.Package: + case nix.Package: return g.Text(firstSentence(data.Description)) }
M internal/components/detail.go → internal/components/detail.go
@@ -8,10 +8,10 @@ ) func Detail(thing nix.Importable) g.Node { switch t := thing.(type) { - case *nix.Option: - return OptionDetail(*t) - case *nix.Package: - return PackageDetail(*t) + case nix.Option: + return OptionDetail(t) + case nix.Package: + return PackageDetail(t) default: return nil }
M internal/components/options.go → internal/components/options.go
@@ -22,8 +22,8 @@ ), ), TBody( g.MapIter(result.Hits, func(hit index.DocumentMatch) g.Node { - if m, ok := hit.Data.(*nix.Option); ok { - return optionRow(hit, *m) + if m, ok := hit.Data.(nix.Option); ok { + return optionRow(hit, m) } return emptyOptionRow(hit)
M internal/components/packages.go → internal/components/packages.go
@@ -23,8 +23,8 @@ ), ), TBody( g.MapIter(result.Hits, func(hit index.DocumentMatch) g.Node { - if m, ok := hit.Data.(*nix.Package); ok { - return packageRow(hit, *m) + if m, ok := hit.Data.(nix.Package); ok { + return packageRow(hit, m) } return emptyPackageRow(hit)
M internal/importer/importer.go → internal/importer/importer.go
@@ -2,48 +2,63 @@ package importer import ( "context" + "sync" - "alin.ovh/searchix/internal/config" + "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/nix" - "alin.ovh/searchix/internal/storage" ) type Processor interface { Process(context.Context) (<-chan nix.Importable, <-chan error) } -type ( - ImportSource func(context.Context) (<-chan nix.Importable, <-chan error) - ImportDestination func(context.Context, <-chan nix.Importable) <-chan error -) - -func (imp *Importer) indexSource( +func (imp *Importer) process( ctx context.Context, - source *config.Source, + processor Processor, ) (bool, error) { - writer := imp.options.WriteIndex - var exporter func(context.Context) (<-chan nix.Importable, <-chan error) - switch source.Importer { - case config.Packages: - exporter = storage.MakeSourceExporter[nix.Package]( - imp.options.Storage, - source, - writer.GetBatchSize(), - ) - case config.Options: - exporter = storage.MakeSourceExporter[nix.Option]( - imp.options.Storage, - source, - writer.GetBatchSize(), - ) - } + wg := sync.WaitGroup{} + + wg.Add(1) + objects, pErrs := processor.Process(ctx) + + wg.Add(1) + iErrs := imp.options.WriteIndex.Import(ctx, objects) + + var hadObjectErrors bool + var criticalError error + go func() { + for { + select { + case err, running := <-iErrs: + if !running { + wg.Done() + iErrs = nil + imp.options.Logger.Debug("ingest completed") + + continue + } + be, isBatchError := err.(*index.BatchError) + if isBatchError { + criticalError = be + + break + } + hadObjectErrors = true + imp.options.Logger.Warn("error ingesting object", "error", err) + case err, running := <-pErrs: + if !running { + wg.Done() + pErrs = nil - return pipe( - ctx, - imp.options.Logger, - exporter, - func(ctx context.Context, objects <-chan nix.Importable) <-chan error { - return writer.Import(ctx, objects) - }, - ) + continue + } + hadObjectErrors = true + imp.options.Logger.Warn("error processing object", "error", err) + } + } + }() + + wg.Wait() + + return hadObjectErrors, criticalError }
M internal/importer/job.go → internal/importer/job.go
@@ -67,14 +67,14 @@ MarkImportStarted() ctx, cancel := context.WithTimeout(parentCtx, imp.config.Importer.Timeout.Duration) - err := imp.Fetch(ctx, false, false, nil) + err := imp.Fetch(ctx, false, nil) if err != nil { imp.options.Logger.Warn("error fetching update", "error", err) } else { imp.options.Logger.Info("update complete") } - err = imp.Index(ctx) + err = imp.Index(ctx, nil) if err != nil { imp.options.Logger.Warn("error indexing update", "error", err) } else {
M internal/importer/main.go → internal/importer/main.go
@@ -12,7 +12,6 @@ "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" - "github.com/asdine/storm/v3/q" "github.com/blevesearch/bleve/v2" "alin.ovh/searchix/internal/config"@@ -20,19 +19,17 @@ "alin.ovh/searchix/internal/fetcher" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" - "alin.ovh/searchix/internal/nix" "alin.ovh/searchix/internal/programs" - "alin.ovh/searchix/internal/storage" ) type Options struct { LowMemory bool Offline bool Logger *log.Logger + ReadIndex *index.ReadIndex WriteIndex *index.WriteIndex Manpages *manpages.URLMap Root *file.Root - Storage *storage.Store } type Importer struct {@@ -55,8 +52,7 @@ func (imp *Importer) Fetch( ctx context.Context, forceUpdate bool, - fetchOnly bool, - onlyUpdateSources *[]string, + onlyUpdateSources []string, ) error { if len(imp.config.Importer.Sources) == 0 { imp.options.Logger.Info("No sources enabled")@@ -71,14 +67,14 @@ imp.config.Importer.Timeout.Duration, ) defer cancelImport() - forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) + forceUpdate = forceUpdate || (len(onlyUpdateSources) > 0) meta := imp.options.WriteIndex.Meta - importSource := imp.createSourceImporter(importCtx, meta, forceUpdate, fetchOnly) + importSource := imp.createSourceFetcher(importCtx, meta, forceUpdate) for name, source := range imp.config.Importer.Sources { - if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { - if !slices.Contains(*onlyUpdateSources, name) { + if len(onlyUpdateSources) > 0 { + if !slices.Contains(onlyUpdateSources, name) { continue } }@@ -91,15 +87,32 @@ return nil } -func (imp *Importer) Index(ctx context.Context) error { +func (imp *Importer) Index(ctx context.Context, onlyUpdateSources []string) error { + if len(imp.config.Importer.Sources) == 0 { + imp.options.Logger.Info("No sources enabled") + + return nil + } + + imp.options.Logger.Debug("starting importer", "timeout", imp.config.Importer.Timeout.Duration) + importCtx, cancelImport := context.WithTimeout( + ctx, + imp.config.Importer.Timeout.Duration, + ) + defer cancelImport() + + meta := imp.options.WriteIndex.Meta + + importSource := imp.createSourceImporter(importCtx, meta) for name, source := range imp.config.Importer.Sources { - hadErrors, err := imp.indexSource(ctx, source) + if len(onlyUpdateSources) > 0 { + if !slices.Contains(onlyUpdateSources, name) { + continue + } + } + err := importSource(source) if err != nil { - return fault.Wrap(err, fmsg.Withf("Failed to import source %s", name)) - } - - if hadErrors { - imp.options.Logger.Warn("Imported source encountered errors", "source", source.Name) + imp.options.Logger.Error("import failed", "source", name, "error", err) } }@@ -130,8 +143,7 @@ imp.options.Logger.Info("adding new sources", "sources", newSources) err := imp.Fetch( ctx, false, - false, - &newSources, + newSources, ) if err != nil { return fault.Wrap(err, fmsg.With("Failed to update index with new sources"))@@ -162,76 +174,45 @@ return nil } -func (imp *Importer) PruneSource(ctx context.Context, source *config.Source) error { - store := imp.options.Storage +func (imp *Importer) PruneSource( + _ context.Context, + source *config.Source, +) error { + read := imp.options.ReadIndex write := imp.options.WriteIndex - tx, err := store.WithBatch(true).From(source.Key).Begin(true) - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to begin transaction")) + if read == nil { + imp.options.Logger.DPanic("read index is not available") } - defer tx.Rollback() cutoff := write.Meta.LastImport.StartedAt imp.options.Logger.Debug("searching for old entities", "cutoff", cutoff.Format(time.RFC3339)) - query := tx.Select(q.Lt("ImportedAt", cutoff)) - - var obj nix.Importable - switch source.Importer { - case config.Options: - obj = new(nix.Option) - case config.Packages: - obj = new(nix.Package) - } - count, err := query.Count(obj) + maxCount, err := read.Count(source) if err != nil { return fault.Wrap(err, fmsg.With("failed to retrieve entities for pruning")) } - if count == 0 { - return nil - } - - maxCount, err := tx.Count(obj) + res, err := read.ImportedBefore(cutoff, source) if err != nil { return fault.Wrap(err, fmsg.With("failed to retrieve entities for pruning")) } - if float64(count) > (0.9 * float64(maxCount)) { - return fault.Newf("too many entities to prune: %d/%d (threshold: 90%%)", count, maxCount) + if res.Total == 0 { + return nil } - objs := make(chan nix.Importable, 1) - errs := write.WithBatch(ctx, objs, func(batch *bleve.Batch, obj nix.Importable) error { - batch.Delete(obj.GetName()) + if float64(res.Total) > (0.9 * float64(maxCount)) { + return fault.Newf("too many entities to prune: %d/%d (threshold: 90%%)", res.Total, maxCount) + } - return nil - }) - - go func() { - for err := range errs { - imp.options.Logger.Error("failed to prune old entities", "error", err) + err = write.WithBatch(func(batch *bleve.Batch) { + for _, dm := range res.Hits { + batch.Delete(dm.ID) } - }() - - err = query.Each(obj, func(record any) error { - objs <- record.(nix.Importable) - - return nil }) if err != nil { - return fault.Wrap(err, fmsg.With("failed to prune old entities from index")) - } - - err = query.Delete(obj) - if err != nil { - return fault.Wrap(err, fmsg.With("failed to prune old entities from storage")) - } - - err = tx.Commit() - if err != nil { - return fault.Wrap(err, fmsg.With("Failed to commit transaction")) + return fault.Wrap(err, fmsg.With("failed to prune entities")) } imp.options.Logger.Info(@@ -241,21 +222,18 @@ source.Importer.String(), "source", source.Key, "count", - count, + res.Total, ) return nil } -func (imp *Importer) createSourceImporter( +func (imp *Importer) createSourceFetcher( parent context.Context, meta *index.Meta, forceUpdate bool, - fetchOnly bool, ) func(*config.Source) error { return func(source *config.Source) error { - var files *fetcher.FetchedFiles - logger := imp.options.Logger.With("name", source.Key) pdb, err := programs.New(source, &programs.Options{ Logger: logger,@@ -276,124 +254,144 @@ ctx, cancel := context.WithTimeout(parent, source.Timeout.Duration) defer cancel() - if imp.options.Offline { - logger.Debug("skipping fetch; in offline mode") + logger.Debug("starting fetcher") - files, err = fetcher.Open(source, fopts) - if err != nil { - return fault.Wrap(err, fmsg.With("error opening fetched files")) + fetcher, err := fetcher.New(source, fopts) + if err != nil { + return fault.Wrap(err, fmsg.With("error creating fetcher")) + } + + _, err = fetcher.FetchIfNeeded(ctx, sourceMeta) + if err != nil { + var exerr *exec.ExitError + if errors.As(err, &exerr) { + lines := strings.SplitSeq(strings.TrimSpace(string(exerr.Stderr)), "\n") + for line := range lines { + logger.Error( + "importer fetch failed", + "fetcher", + source.Fetcher.String(), + "stderr", + line, + "status", + exerr.ExitCode(), + ) + } } - } else { - logger.Debug("starting fetcher") + + return fault.Wrap(err, fmsg.With("importer fetch failed")) + } + logger.Info( + "importer fetch succeeded", + "previous", + previousUpdate.Format(time.DateTime), + "current", + sourceMeta.UpdatedAt.Format(time.DateTime), + "is_updated", + sourceMeta.UpdatedAt.After(previousUpdate), + "update_force", + forceUpdate, + ) - fetcher, err := fetcher.New(source, fopts) + if source.Programs.Enable { + err = pdb.Instantiate(ctx) if err != nil { - return fault.Wrap(err, fmsg.With("error creating fetcher")) + logger.Warn("programs database instantiation failed", "error", err) } + } - files, err = fetcher.FetchIfNeeded(ctx, sourceMeta) + if source.Manpages.Enable { + err = imp.options.Manpages.Update(ctx, source) if err != nil { - var exerr *exec.ExitError - if errors.As(err, &exerr) { - lines := strings.SplitSeq(strings.TrimSpace(string(exerr.Stderr)), "\n") - for line := range lines { - logger.Error( - "importer fetch failed", - "fetcher", - source.Fetcher.String(), - "stderr", - line, - "status", - exerr.ExitCode(), - ) - } - } + logger.Warn("manpages database update failed", "error", err) + } + } + + return nil + } +} - return fault.Wrap(err, fmsg.With("importer fetch failed")) - } - logger.Info( - "importer fetch succeeded", - "previous", - previousUpdate.Format(time.DateTime), - "current", - sourceMeta.UpdatedAt.Format(time.DateTime), - "is_updated", - sourceMeta.UpdatedAt.After(previousUpdate), - "update_force", - forceUpdate, - "fetch_only", - fetchOnly, - ) +func (imp *Importer) createSourceImporter( + parent context.Context, + meta *index.Meta, +) func(*config.Source) error { + return func(source *config.Source) error { + logger := imp.options.Logger.With("name", source.Key) + pdb, err := programs.New(source, &programs.Options{ + Logger: logger, + Root: imp.options.Root, + }) + if err != nil { + return fault.Wrap(err, fmsg.With("error creating program database")) + } - if source.Programs.Enable { - err = pdb.Instantiate(ctx) - if err != nil { - logger.Warn("programs database instantiation failed", "error", err) - } - } + sourceMeta := meta.GetSourceMeta(source.Key) - if source.Manpages.Enable { - err = imp.options.Manpages.Update(ctx, source) - if err != nil { - logger.Warn("manpages database update failed", "error", err) - } - } + fopts := &fetcher.Options{ + Logger: logger, + Root: imp.options.Root, } - if !fetchOnly && - (!sourceMeta.UpdatedAt.After(sourceMeta.StoredAt) || sourceMeta.StoredAt.IsZero() || forceUpdate) { + ctx, cancel := context.WithTimeout(parent, source.Timeout.Duration) + defer cancel() - if files.Revision != nil { - err = setRepoRevision(files.Revision, source) - if err != nil { - logger.Warn("could not set source repo revision", "error", err) - } + files, err := fetcher.Open(source, fopts) + if err != nil { + return fault.Wrap(err, fmsg.With("error opening fetched files")) + } + + if source.Programs.Enable { + err = pdb.Instantiate(ctx) + if err != nil { + logger.Warn("programs database instantiation failed", "error", err) } + } - var processor Processor - logger.Debug( - "creating processor", - "importer_type", - source.Importer, - "revision", - source.Repo.Revision, - ) - switch source.Importer { - case config.Options: - processor, err = NewOptionProcessor( - files.Options, - source, - logger.Named("processor"), - ) - case config.Packages: - processor, err = NewPackageProcessor( - files.Packages, - source, - logger.Named("processor"), - pdb, - ) - } + if files.Revision != nil { + err = setRepoRevision(files.Revision, source) if err != nil { - return fault.Wrap(err, fmsg.Withf("failed to create processor")) + logger.Warn("could not set source repo revision", "error", err) } + } - hadWarnings, err := pipe( - ctx, - logger.Named("importer"), - processor.Process, - imp.options.Storage.MakeSourceImporter(source), + var processor Processor + logger.Debug( + "creating processor", + "importer_type", + source.Importer, + "revision", + source.Repo.Revision, + ) + switch source.Importer { + case config.Options: + processor, err = NewOptionProcessor( + files.Options, + source, + logger.Named("processor"), ) - if err != nil { - return fault.Wrap(err, fmsg.Withf("failed to process source")) - } + case config.Packages: + processor, err = NewPackageProcessor( + files.Packages, + source, + logger.Named("processor"), + pdb, + ) + } + if err != nil { + return fault.Wrap(err, fmsg.Withf("failed to create processor")) + } - sourceMeta.StoredAt = time.Now() + hadWarnings, err := imp.process(ctx, processor) + if err != nil { + return fault.Wrap(err, fmsg.Withf("failed to process source")) + } + + sourceMeta.StoredAt = time.Now() - if hadWarnings { - logger.Warn("importer succeeded, but with warnings/errors") - } else { - logger.Info("importer succeeded") - } + if hadWarnings { + logger.Warn("importer succeeded, but with warnings/errors") + } else { + logger.Info("importer succeeded") } sourceMeta.Rev = source.Repo.Revision
M internal/importer/main_test.go → internal/importer/main_test.go
@@ -9,7 +9,6 @@ "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" - "alin.ovh/searchix/internal/storage" ) var cfg = config.DefaultConfig@@ -35,15 +34,6 @@ if err != nil { b.Fatal(err) } - store, err := storage.New(&storage.Options{ - LowMemory: true, - Root: tmp, - Logger: logger.Named("storage"), - }) - if err != nil { - b.Fatal(err) - } - imp, err := New(&cfg, &Options{ Logger: logger.Named("importer"), LowMemory: true,@@ -54,7 +44,6 @@ Root: tmp, }), Offline: false, Root: tmp, - Storage: store, }) if err != nil { b.Fatal(err)@@ -66,18 +55,9 @@ err = imp.Fetch( b.Context(), false, - false, - &[]string{source.Key}, + []string{source.Key}, ) if err != nil { b.Fatal(err) - } - - hadErrors, err := imp.indexSource(b.Context(), source) - if err != nil { - b.Fatal(err) - } - if hadErrors { - b.Fatal("had errors") } }
M internal/importer/utils.go → internal/importer/utils.go
@@ -1,16 +1,13 @@ package importer import ( - "context" "fmt" "io" "strings" - "sync" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/nix" - "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" "github.com/bcicen/jstream"@@ -64,55 +61,3 @@ } return nil } - -func pipe( - ctx context.Context, - log *log.Logger, - src ImportSource, - dst ImportDestination, -) (bool, error) { - wg := sync.WaitGroup{} - - wg.Add(1) - objects, srcErrs := src(ctx) - - wg.Add(1) - dstErrors := dst(ctx, objects) - - var hadObjectErrors bool - var criticalError error - go func() { - for { - select { - case err, running := <-srcErrs: - if err != nil { - hadObjectErrors = true - log.Warn("error processing object from source", "error", err) - } - if !running { - wg.Done() - srcErrs = nil - log.Debug("processing completed") - - continue - } - case err, running := <-dstErrors: - if err != nil { - hadObjectErrors = true - log.Warn("error writing object to target", "error", err) - - } - if !running { - wg.Done() - dstErrors = nil - - continue - } - } - } - }() - - wg.Wait() - - return hadObjectErrors, criticalError -}
M internal/index/indexer.go → internal/index/indexer.go
@@ -1,19 +1,21 @@ package index import ( + "bytes" "context" + "encoding/gob" "math" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/index/nixattr" "alin.ovh/searchix/internal/nix" - "alin.ovh/searchix/internal/storage" "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/analysis" "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" "github.com/blevesearch/bleve/v2/analysis/analyzer/keyword" "github.com/blevesearch/bleve/v2/analysis/analyzer/simple"@@ -23,7 +25,9 @@ "github.com/blevesearch/bleve/v2/analysis/token/ngram" "github.com/blevesearch/bleve/v2/analysis/token/porter" "github.com/blevesearch/bleve/v2/analysis/tokenizer/letter" "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/v2/document" "github.com/blevesearch/bleve/v2/mapping" + index "github.com/blevesearch/bleve_index_api" "go.uber.org/zap" )@@ -31,7 +35,6 @@ type Options struct { Force bool LowMemory bool BatchSize int - Store *storage.Store Logger *log.Logger Root *file.Root Config *config.Config@@ -41,7 +44,6 @@ type WriteIndex struct { batchSize int index bleve.Index log *log.Logger - store *storage.Store exists bool Meta *Meta }@@ -49,6 +51,8 @@ type BatchError struct { error } + +var idAnalyzer analysis.Analyzer func createIndexMapping() (mapping.IndexMapping, error) { indexMapping := bleve.NewIndexMapping()@@ -284,7 +288,6 @@ return &ReadIndex{ config: options.Config, log: options.Logger, - store: options.Store, exists: exists, index: idx, meta: meta,@@ -294,7 +297,6 @@ exists: exists, index: idx, batchSize: options.BatchSize, log: options.Logger, - store: options.Store, Meta: meta, }, nil@@ -312,8 +314,30 @@ func (i *WriteIndex) Import( ctx context.Context, objects <-chan nix.Importable, ) <-chan error { - return i.WithBatch(ctx, objects, func(batch *bleve.Batch, obj nix.Importable) error { - if err := batch.Index(nix.GetKey(obj), obj); err != nil { + indexMapping := i.index.Mapping() + + return i.WithBatchObjects(ctx, objects, func(batch *bleve.Batch, obj nix.Importable) error { + doc := document.NewDocument(nix.GetKey(obj)) + if err := indexMapping.MapDocument(doc, obj); err != nil { + return fault.Wrap(err, fmsg.Withf("could not map document for object: %s", obj.GetName())) + } + + var data bytes.Buffer + enc := gob.NewEncoder(&data) + if err := enc.Encode(&obj); err != nil { + return fault.Wrap(err, fmsg.With("could not store object in search index")) + } + field := document.NewTextFieldWithIndexingOptions("_data", nil, data.Bytes(), index.StoreField) + doc.AddField(field) + idField := document.NewTextFieldCustom( + "_id", nil, []byte(doc.ID()), + index.IndexField|index.StoreField|index.IncludeTermVectors, + idAnalyzer, + ) + doc.AddField(idField) + + // log.Debug("adding object to index", "name", opt.Name) + if err := batch.IndexAdvanced(doc); err != nil { return fault.Wrap(err, fmsg.Withf("could not index object %s", obj.GetName())) }@@ -325,7 +349,19 @@ func (i *WriteIndex) GetBatchSize() int { return i.batchSize } -func (i *WriteIndex) WithBatch( +func (i *WriteIndex) WithBatch(fn func(batch *bleve.Batch)) error { + batch := i.index.NewBatch() + fn(batch) + + err := i.Flush(batch) + if err != nil { + return fault.Wrap(err, fmsg.With("could not flush batch")) + } + + return nil +} + +func (i *WriteIndex) WithBatchObjects( ctx context.Context, objects <-chan nix.Importable, processor func(batch *bleve.Batch, obj nix.Importable) error,
M internal/index/search.go → internal/index/search.go
@@ -1,8 +1,9 @@ package index import ( + "bytes" "context" - "errors" + "encoding/gob" "iter" "strings" "time"@@ -11,12 +12,10 @@ "alin.ovh/x/log" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/nix" - "alin.ovh/searchix/internal/storage" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fctx" "github.com/Southclaws/fault/fmsg" - "github.com/asdine/storm/v3" "github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2/search" "github.com/blevesearch/bleve/v2/search/query"@@ -37,7 +36,6 @@ type ReadIndex struct { index bleve.Index config *config.Config - store *storage.Store log *log.Logger exists bool meta *Meta@@ -87,10 +85,9 @@ } func (index *ReadIndex) search( ctx context.Context, - source *config.Source, request *bleve.SearchRequest, ) (*Result, error) { - request.Fields = []string{"Source"} + request.Fields = []string{"_data", "Source"} bleveResult, err := index.index.SearchInContext(ctx, request) select {@@ -105,38 +102,21 @@ ) } hits := func(yield func(DocumentMatch) bool) { + var buf bytes.Buffer for _, match := range bleveResult.Hits { hit := DocumentMatch{ DocumentMatch: match, Data: nil, } - - parts := strings.SplitN(match.ID, "/", 3) - sourceName := parts[1] - id := parts[2] - - src := source - if src == nil { - var ok bool - src, ok = index.config.Importer.Sources[sourceName] - if !ok { - continue - } + _, err := buf.WriteString(match.Fields["_data"].(string)) + if err != nil { + index.log.Warn("error fetching result data", "error", err) } - - doc, err := index.store.GetDocument(src, id) + err = gob.NewDecoder(&buf).Decode(&hit.Data) if err != nil { - if errors.Is(err, storm.ErrNotFound) { - index.log.Warn("document not found", "source", sourceName, "id", id) - } else { - index.log.Error("error getting document", "error", err) - } - - continue + index.log.Warn("error decoding gob data", "error", err, "data", buf.String()) } - - hit.Data = doc - + buf.Reset() if !yield(hit) { return }@@ -252,7 +232,35 @@ if from != 0 { search.From = from } - return index.search(ctx, source, search) + return index.search(ctx, search) +} + +func (index *ReadIndex) ImportedBefore( + cutoff time.Time, + source *config.Source, +) (*bleve.SearchResult, error) { + cutoffQuery := bleve.NewDateRangeQuery(time.UnixMilli(0), cutoff) + cutoffQuery.SetField("ImportedAt") + + all := bleve.NewConjunctionQuery(cutoffQuery) + + if source != nil { + sourceQuery := bleve.NewTermQuery(source.Key) + sourceQuery.SetField("Source") + + all.AddQuery(sourceQuery) + } + + req := bleve.NewSearchRequest(all) + req.Size = 10_000 + req.SortBy([]string{"_id"}) + + res, err := index.index.Search(req) + if err != nil { + return nil, fault.Wrap(err, fmsg.With("could not query old documents")) + } + + return res, nil } func (index *ReadIndex) Count(source *config.Source) (uint64, error) {@@ -283,3 +291,31 @@ } return nil } + +func (index *ReadIndex) GetDocument( + ctx context.Context, + source *config.Source, + id string, +) (nix.Importable, error) { + key := nix.MakeKey(source, id) + query := bleve.NewDocIDQuery([]string{key}) + search := bleve.NewSearchRequest(query) + search.Size = 1 + + result, err := index.search(ctx, search) + if err != nil { + return nil, err + } + + if result.Total == 0 { + return nil, nil + } + + for hit := range result.Hits { + if hit.ID == key { + return hit.Data, err + } + } + + return nil, err +}
M internal/index/search_test.go → internal/index/search_test.go
@@ -14,7 +14,6 @@ "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/nix" - "alin.ovh/searchix/internal/storage" ) const rootPath = "../../data"@@ -28,15 +27,6 @@ if err != nil { t.Fatal(err) } defer root.Close() - - store, err := storage.New(&storage.Options{ - Root: root, - Logger: log.Named("storage"), - }) - if err != nil { - t.Fatal(err) - } - defer store.Close() read, _, err := index.OpenOrCreate(&index.Options{ Logger: log.Named("index"),@@ -44,7 +34,6 @@ Root: root, BatchSize: cfg.Importer.BatchSize, Force: false, LowMemory: false, - Store: store, Config: &cfg, }) if err != nil {@@ -96,9 +85,9 @@ "gitSVN": 0, } var i int for hit := range result.Hits { - data, ok := hit.Data.(*nix.Package) + data, ok := hit.Data.(nix.Package) if !ok { - t.Fatalf("Expected hit.Data to be *nix.Package, got %T", hit.Data) + t.Fatalf("Expected hit.Data to be nix.Package, got %T", hit.Data) } if _, found := important[data.Attribute]; found {@@ -143,9 +132,9 @@ unwanted := "javacc" unwantedIndex := math.MaxInt var i int for hit := range result.Hits { - data, ok := hit.Data.(*nix.Package) + data, ok := hit.Data.(nix.Package) if !ok { - t.Fatalf("Expected hit.Data to be *nix.Package, got %T", hit.Data) + t.Fatalf("Expected hit.Data to be nix.Package, got %T", hit.Data) } if _, found := important[data.Attribute]; found {
M internal/server/mux.go → internal/server/mux.go
@@ -71,7 +71,6 @@ if options.ReadIndex == nil { return nil, fault.New("read index is nil") } index := options.ReadIndex - store := options.Store sortSources(cfg.Importer.Sources) assets, err := frontend.New() if err != nil {@@ -233,7 +232,7 @@ return } importerSingular := importerType.Singular() - doc, err := store.GetDocument(source, r.PathValue("id")) + doc, err := index.GetDocument(r.Context(), source, r.PathValue("id")) if err != nil { if ftag.Get(err) == ftag.NotFound { log.Warn("document not found", "source", source.Key, "id", r.PathValue("id"))
M internal/server/server.go → internal/server/server.go
@@ -10,7 +10,6 @@ "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/manpages" - "alin.ovh/searchix/internal/storage" "alin.ovh/x/log" "github.com/Southclaws/fault"@@ -30,7 +29,6 @@ } type Options struct { ReadIndex *index.ReadIndex - Store *storage.Store ManpagesURLMap *manpages.URLMap }