refactor(importer): use interfaces, not generics
6 files changed, 22 insertions(+), 69 deletions(-)
M cmd/searchix-web/ingest.go → cmd/searchix-web/ingest.go
@@ -83,13 +83,7 @@ } if !exists || opts.Reindex { for _, source := range cfg.Importer.Sources { - hadErrors, err := importer.ImportSource( - ctx, - logger.Named("importer"), - store, - source, - write, - ) + hadErrors, err := imp.IndexSource(ctx, source) if err != nil { return fault.Wrap(err, fmsg.Withf("Failed to import source %s", source.Name)) }
M cmd/searchix-web/serve.go → cmd/searchix-web/serve.go
@@ -87,13 +87,7 @@ return fault.Wrap(err, fmsg.With("Failed to start importer")) } for _, source := range cfg.Importer.Sources { - hadErrors, err := importer.ImportSource( - ctx, - logger.Named("importer"), - store, - source, - write, - ) + hadErrors, err := imp.IndexSource(ctx, source) if err != nil { return fault.Wrap(err, fmsg.Withf("Failed to import source %s", source.Name)) }
M internal/importer/importer.go → internal/importer/importer.go
@@ -7,7 +7,6 @@ "alin.ovh/x/log" "alin.ovh/searchix/internal/config" - "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/nix" "alin.ovh/searchix/internal/storage" )@@ -16,11 +15,16 @@ type Processor interface { Process(context.Context) (<-chan nix.Importable, <-chan error) } -func Import[I nix.Importable]( +type ( + ImportSource func(context.Context) (<-chan nix.Importable, <-chan error) + ImportDestination func(context.Context, <-chan nix.Importable) <-chan error +) + +func Import( ctx context.Context, log *log.Logger, - src func(context.Context) (<-chan I, <-chan error), - dst func(context.Context, <-chan I) <-chan error, + src ImportSource, + dst ImportDestination, ) (bool, error) { wg := sync.WaitGroup{}@@ -63,37 +67,16 @@ return hadObjectErrors, criticalError } -func ImportSource( +func (imp *Importer) IndexSource( ctx context.Context, - logger *log.Logger, - store *storage.Store, source *config.Source, - write *index.WriteIndex, -) (bool, error) { - var it func(context.Context, *log.Logger, *storage.Store, *config.Source, *index.WriteIndex) (bool, error) - switch source.Importer { - case config.Packages: - it = ImportWithType[nix.Package] - case config.Options: - it = ImportWithType[nix.Option] - } - - return it(ctx, logger, store, source, write) -} - -func ImportWithType[I nix.Importable]( - ctx context.Context, - logger *log.Logger, - store *storage.Store, - source *config.Source, - write *index.WriteIndex, ) (bool, error) { return Import( ctx, - logger.Named("importer"), - storage.MakeSourceExporter[I](store, source), - func(ctx context.Context, objects <-chan I) <-chan error { - return write.Import(ctx, nix.ToGenericChannel(objects)) + imp.options.Logger, + storage.MakeSourceExporter(imp.options.Storage, source), + func(ctx context.Context, objects <-chan nix.Importable) <-chan error { + return imp.options.WriteIndex.Import(ctx, objects) }, ) }
M internal/importer/main_test.go → internal/importer/main_test.go
@@ -72,13 +72,7 @@ if err != nil { b.Fatal(err) } - hadErrors, err := ImportSource( - b.Context(), - logger.Named("importer"), - store, - source, - write, - ) + hadErrors, err := imp.IndexSource(b.Context(), source) if err != nil { b.Fatal(err) }
M internal/nix/importable.go → internal/nix/importable.go
@@ -20,18 +20,6 @@ func MakeKey(source *config.Source, id string) string { return source.Importer.Singular() + "/" + source.Key + "/" + id } -func ToGenericChannel[I Importable](in <-chan I) <-chan Importable { - out := make(chan Importable) - go func() { - for i := range in { - out <- i - } - close(out) - }() - - return out -} - func init() { gob.Register(Option{}) gob.Register(Package{})
M internal/storage/store.go → internal/storage/store.go
@@ -181,20 +181,20 @@ return doc, nil } -func MakeSourceExporter[I nix.Importable]( +func MakeSourceExporter( store *Store, source *config.Source, -) func(context.Context) (<-chan I, <-chan error) { - return func(_ context.Context) (<-chan I, <-chan error) { - results := make(chan I) +) func(context.Context) (<-chan nix.Importable, <-chan error) { + return func(_ context.Context) (<-chan nix.Importable, <-chan error) { + results := make(chan nix.Importable) errs := make(chan error) go func() { defer close(results) defer close(errs) - var obj I - objs := make([]I, 0, BatchSize) + var obj nix.Importable + objs := make([]nix.Importable, 0, BatchSize) node := store.From(source.Key) count, err := node.Count(&obj) if err != nil {