internal/importer/importer.go (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | package importer import ( "context" "sync" "alin.ovh/x/log" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/nix" "alin.ovh/searchix/internal/storage" ) type Processor interface { Process(context.Context) (<-chan nix.Importable, <-chan error) } type ( ImportSource func(context.Context) (<-chan nix.Importable, <-chan error) ImportDestination func(context.Context, <-chan nix.Importable) <-chan error ) func Import( ctx context.Context, log *log.Logger, src ImportSource, dst ImportDestination, ) (bool, error) { wg := sync.WaitGroup{} wg.Add(1) objects, srcErrs := src(ctx) wg.Add(1) dstErrors := dst(ctx, objects) var hadObjectErrors bool var criticalError error go func() { for { select { case err, running := <-srcErrs: if !running { wg.Done() srcErrs = nil log.Debug("processing completed") continue } hadObjectErrors = true log.Warn("error processing object from source", "error", err) case err, running := <-dstErrors: if !running { wg.Done() dstErrors = nil continue } hadObjectErrors = true log.Warn("error writing object to target", "error", err) } } }() wg.Wait() return hadObjectErrors, criticalError } func (imp *Importer) IndexSource( ctx context.Context, source *config.Source, ) (bool, error) { return Import( ctx, imp.options.Logger, storage.MakeSourceExporter(imp.options.Storage, source), func(ctx context.Context, objects <-chan nix.Importable) <-chan error { return imp.options.WriteIndex.Import(ctx, objects) }, ) } |