internal/importer/utils.go (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | package importer import ( "context" "fmt" "io" "strings" "sync" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/nix" "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" "github.com/bcicen/jstream" ) func ValueTypeToString(valueType jstream.ValueType) string { switch valueType { case jstream.Unknown: return "unknown" case jstream.Null: return "null" case jstream.String: return "string" case jstream.Number: return "number" case jstream.Boolean: return "boolean" case jstream.Array: return "array" case jstream.Object: return "object" } return "very strange" } func MakeChannelLink(repo config.Repository, subPath string) (*nix.Link, error) { url, err := repo.GetFileURL(subPath) if err != nil { return nil, fault.Wrap(err, fmsg.Withf("failed to generate channel link for %s", subPath)) } return &nix.Link{ Name: fmt.Sprintf("<%s/%s>", repo.Repo, subPath), URL: url, }, nil } func setRepoRevision(file io.ReadCloser, source *config.Source) error { if file != nil { defer file.Close() var str strings.Builder _, err := io.Copy(&str, file) if err != nil { return fault.Wrap( err, fmsg.Withf("unable to read revision file")) } source.Repo.Revision = strings.TrimSpace(str.String()) } return nil } func pipe( ctx context.Context, log *log.Logger, src ImportSource, dst ImportDestination, ) (bool, error) { wg := sync.WaitGroup{} wg.Add(1) objects, srcErrs := src(ctx) wg.Add(1) dstErrors := dst(ctx, objects) var hadObjectErrors bool var criticalError error go func() { for { select { case err, running := <-srcErrs: if err != nil { hadObjectErrors = true log.Warn("error processing object from source", "error", err) } if !running { wg.Done() srcErrs = nil log.Debug("processing completed") continue } case err, running := <-dstErrors: if err != nil { hadObjectErrors = true log.Warn("error writing object to target", "error", err) } if !running { wg.Done() dstErrors = nil continue } } } }() wg.Wait() return hadObjectErrors, criticalError } |