package storage import ( "context" "errors" "time" "alin.ovh/x/log" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" "github.com/asdine/storm/v3" "github.com/asdine/storm/v3/codec/gob" "go.etcd.io/bbolt" "go.uber.org/zap" "alin.ovh/searchix/internal/config" "alin.ovh/searchix/internal/file" "alin.ovh/searchix/internal/nix" ) var BatchSize = 10000 type Options struct { Root *file.Root Logger *log.Logger } type Store struct { *storm.DB log *log.Logger } func New(opts *Options) (*Store, error) { //nolint:forbidigo // external package path := opts.Root.JoinPath("searchix.bolt") bb, err := storm.Open(path, storm.Codec(gob.Codec), storm.BoltOptions(0o600, &bbolt.Options{ Timeout: 1 * time.Second, Logger: Logger{ opts.Logger.Named("bolt").GetLogger().Sugar().WithOptions( zap.IncreaseLevel(zap.InfoLevel), ), }, }), ) if err != nil { return nil, fault.Wrap(err, fmsg.With("failed to open database")) } return &Store{ DB: bb, log: opts.Logger, }, nil } func (s *Store) Close() error { err := s.DB.Close() if err != nil { return fault.Wrap(err, fmsg.With("failed to close database")) } return nil } func (s *Store) MakeSourceImporter( source *config.Source, ) func(context.Context, <-chan nix.Importable) <-chan error { return func(ctx context.Context, objects <-chan nix.Importable) <-chan error { errs := make(chan error) node := s.DB.From(source.Key).WithBatch(true) i := 0 var save func(storm.Node, nix.Importable) error switch source.Importer { case config.Packages: save = saveGen[nix.Package] case config.Options: save = saveGen[nix.Option] default: errs <- fault.New("invalid importer") return errs } go func() { defer close(errs) tx, err := node.Begin(true) if err != nil { errs <- fault.Wrap(err, fmsg.With("failed to begin transaction")) return } defer func() { if err := tx.Rollback(); err != nil { if !errors.Is(err, storm.ErrNotInTransaction) { errs <- fault.Wrap(err, fmsg.With("failed to rollback transaction")) } } }() outer: for obj := range objects { i++ select { case <-ctx.Done(): s.log.Warn("import aborted") break outer default: } err := save(tx, obj) if err != nil { errs <- fault.Wrap(err, fmsg.With("failed to save object")) } if i%BatchSize == 0 { s.log.Debug("imported", "count", i) err := tx.Commit() if err != nil { errs <- fault.Wrap(err, fmsg.With("failed to commit transaction")) } tx, err = node.Begin(true) if err != nil { errs <- fault.Wrap(err, fmsg.With("failed to begin transaction")) } } } if err := tx.Commit(); err != nil { errs <- fault.Wrap(err, fmsg.With("failed to commit transaction")) } }() return errs } } func saveGen[T nix.Importable](node storm.Node, obj nix.Importable) error { doc, ok := obj.(T) if !ok { return fault.Newf("invalid type: %T", obj) } if err := node.Save(&doc); err != nil { return fault.Wrap(err, fmsg.With("failed to save document")) } return nil } func (s *Store) GetDocument( source *config.Source, id string, ) (nix.Importable, error) { var doc nix.Importable var err error node := s.From(source.Key) switch source.Importer { case config.Packages: doc = &nix.Package{} err = node.One("Attribute", id, doc) case config.Options: doc = &nix.Option{} err = node.One("Name", id, doc) default: return nil, fault.New("invalid importer type") } if err != nil { return nil, fault.Wrap( err, fmsg.Withf("failed to get document source: %s id: %s", source.Key, id), ) } return doc, nil } func MakeSourceExporter[T nix.Importable]( store *Store, source *config.Source, ) func(context.Context) (<-chan nix.Importable, <-chan error) { return func(_ context.Context) (<-chan nix.Importable, <-chan error) { results := make(chan nix.Importable) errs := make(chan error) go func() { defer close(results) defer close(errs) var obj T objs := make([]T, 0, BatchSize) node := store.From(source.Key) count, err := node.Count(&obj) if err != nil { errs <- fault.Wrap( err, fmsg.Withf("failed to count documents source: %s", source.Key), ) return } limit := min(BatchSize, count) for offset := 0; offset < count; offset += BatchSize { err := node.All(&objs, storm.Skip(offset), storm.Limit(limit)) if err != nil { errs <- fault.Wrap( err, fmsg.Withf("failed to export documents source: %s offset: %d limit: %d", source.Key, offset, limit), ) return } for _, obj := range objs { results <- obj } } }() return results, errs } }