internal/importer/importer.go (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | package importer import ( "context" "sync" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/index" "alin.ovh/searchix/internal/nix" ) type Processor interface { Process(context.Context) (<-chan nix.Importable, <-chan error) } func (imp *Importer) process( ctx context.Context, processor Processor, source *config.Source, ) (bool, error) { wg := sync.WaitGroup{} wg.Add(1) objects, pErrs := processor.Process(ctx) d1, d2 := duplicate(objects) wg.Add(1) iErrs := imp.options.WriteIndex.Import(ctx, d1) wg.Add(1) wErrs := imp.options.Storage.Import(ctx, source, d2) var hadObjectErrors bool var criticalError error go func() { for { select { case err, running := <-iErrs: if !running { wg.Done() iErrs = nil imp.options.Logger.Debug("ingest completed") continue } be, isBatchError := err.(*index.BatchError) if isBatchError { criticalError = be break } hadObjectErrors = true imp.options.Logger.Warn("error ingesting object", "error", err) case err, running := <-pErrs: if !running { wg.Done() pErrs = nil imp.options.Logger.Debug("processing completed") continue } hadObjectErrors = true imp.options.Logger.Warn("error processing object", "error", err) case err, running := <-wErrs: if !running { wg.Done() wErrs = nil continue } hadObjectErrors = true imp.options.Logger.Warn("error writing to storage", "error", err) } } }() wg.Wait() return hadObjectErrors, criticalError } func duplicate[T any](v <-chan T) (<-chan T, <-chan T) { if v == nil { return nil, nil } dup1 := make(chan T, 1) dup2 := make(chan T, 1) go func() { for v := range v { dup1 <- v dup2 <- v } close(dup1) close(dup2) }() return dup1, dup2 } |