all repos — searchix @ eeac61c274b3ee615aedf3318c97715d70915a72

Search engine for NixOS, nix-darwin, home-manager and NUR users

internal/storage/store.go (view raw)

package storage

import (
	"context"
	"errors"
	"time"

	"alin.ovh/x/log"
	"github.com/Southclaws/fault"
	"github.com/Southclaws/fault/fmsg"
	"github.com/Southclaws/fault/ftag"
	"github.com/asdine/storm/v3"
	"github.com/asdine/storm/v3/codec/gob"
	"go.etcd.io/bbolt"

	"alin.ovh/searchix/internal/config"
	"alin.ovh/searchix/internal/file"
	"alin.ovh/searchix/internal/nix"
)

var BatchSize = 50000

type Options struct {
	Replace   bool
	LowMemory bool
	Root      *file.Root
	Logger    *log.Logger
}

type Store struct {
	*storm.DB
	new bool
	log *log.Logger
}

const filename = "searchix.bolt"

func New(opts *Options) (*Store, error) {
	exists, err := opts.Root.Exists(filename)
	if err != nil {
		return nil, fault.Wrap(err, fmsg.With("failed to check if file exists"))
	}

	if opts.Replace && exists {
		err = opts.Root.Remove(filename)
		if err != nil {
			return nil, fault.Wrap(err, fmsg.With("failed to remove existing file"))
		}

		exists = false
	}

	//nolint:forbidigo // external package
	path := opts.Root.JoinPath(filename)
	bb, err := storm.Open(path,
		storm.Codec(gob.Codec),
		storm.BoltOptions(0o600, &bbolt.Options{
			FreelistType:   bbolt.FreelistMapType,
			NoFreelistSync: true,
			NoGrowSync:     true,
			Timeout:        1 * time.Second,
		}),
	)
	if err != nil {
		return nil, fault.Wrap(err, fmsg.With("failed to open database"))
	}

	if !opts.LowMemory {
		bb.Bolt.AllocSize = 256 * 1024 * 1024
	}

	return &Store{
		DB:  bb,
		new: !exists,
		log: opts.Logger,
	}, nil
}

func (s *Store) IsNew() bool {
	return s.new
}

func (s *Store) Close() error {
	err := s.DB.Close()
	if err != nil {
		return fault.Wrap(err, fmsg.With("failed to close database"))
	}

	return nil
}

func (s *Store) MakeSourceImporter(
	source *config.Source,
) func(context.Context, <-chan nix.Importable) <-chan error {
	return func(ctx context.Context, objects <-chan nix.Importable) <-chan error {
		var imp nix.Importable

		errs := make(chan error)
		node := s.DB.From(source.Key).WithBatch(true)

		i := 0

		var save func(storm.Node, nix.Importable) error
		switch source.Importer {
		case config.Packages:
			imp = &nix.Package{}
			save = saveGen[nix.Package]
		case config.Options:
			imp = &nix.Option{}
			save = saveGen[nix.Option]
		default:
			errs <- fault.New("invalid importer")

			return errs
		}

		go func() {
			defer close(errs)
			tx, err := node.Begin(true)
			if err != nil {
				errs <- fault.Wrap(err, fmsg.With("failed to begin transaction"))

				return
			}
			defer func() {
				if err := tx.Rollback(); err != nil {
					if !errors.Is(err, storm.ErrNotInTransaction) {
						errs <- fault.Wrap(err, fmsg.With("failed to rollback transaction"))
					}
				}
			}()

		outer:
			for obj := range objects {
				i++
				select {
				case <-ctx.Done():
					s.log.Warn("import aborted")

					break outer
				default:
				}

				err := save(tx, obj)
				if err != nil {
					errs <- fault.Wrap(err, fmsg.With("failed to save object"))
				}

				if i%BatchSize == 0 {
					s.log.Debug("imported", "count", i)
					err := tx.Commit()
					if err != nil {
						errs <- fault.Wrap(err, fmsg.With("failed to commit transaction"))
					}
					tx, err = node.Begin(true)
					if err != nil {
						errs <- fault.Wrap(err, fmsg.With("failed to begin transaction"))
					}
				}
			}

			if err := tx.Commit(); err != nil {
				errs <- fault.Wrap(err, fmsg.With("failed to commit transaction"))
			}

			if err := node.ReIndex(imp); err != nil {
				errs <- fault.Wrap(err, fmsg.With("failed to reindex storm db"))
			}
		}()

		return errs
	}
}

func saveGen[T nix.Importable](node storm.Node, obj nix.Importable) error {
	doc, ok := obj.(T)
	if !ok {
		return fault.Newf("invalid type: %T", obj)
	}

	if err := node.Save(&doc); err != nil {
		return fault.Wrap(err, fmsg.With("failed to save document"))
	}

	return nil
}

func (s *Store) GetDocument(
	source *config.Source,
	id string,
) (nix.Importable, error) {
	var doc nix.Importable
	var err error

	node := s.From(source.Key)

	switch source.Importer {
	case config.Packages:
		doc = &nix.Package{}
		err = node.One("Attribute", id, doc)
	case config.Options:
		doc = &nix.Option{}
		err = node.One("Name", id, doc)
	default:
		return nil, fault.New("invalid importer type")
	}

	if err != nil {
		if errors.Is(err, storm.ErrNotFound) {
			return nil,
				fault.Wrap(
					fault.Newf("document not found source: %s id: %s", source.Key, id),
					ftag.With(ftag.NotFound),
				)
		}

		return nil, fault.Wrap(
			err,
			fmsg.Withf("failed to get document source: %s id: %s", source.Key, id),
		)
	}

	return doc, nil
}

func MakeSourceExporter[T nix.Importable](
	store *Store,
	source *config.Source,
	batchSize int,
) func(context.Context) (<-chan nix.Importable, <-chan error) {
	return func(_ context.Context) (<-chan nix.Importable, <-chan error) {
		results := make(chan nix.Importable, 1)
		errs := make(chan error)

		go func() {
			defer close(results)
			defer close(errs)

			var obj T
			objs := make([]T, 0, batchSize)
			node := store.From(source.Key)
			count, err := node.Count(&obj)
			if err != nil {
				errs <- fault.Wrap(
					err,
					fmsg.Withf("failed to count documents source: %s", source.Key),
				)

				return
			}

			limit := min(batchSize, count)
			for offset := 0; offset < count; offset += batchSize {
				err := node.All(&objs, storm.Skip(offset), storm.Limit(limit))
				if err != nil {
					errs <- fault.Wrap(
						err,
						fmsg.Withf("failed to export documents source: %s offset: %d limit: %d", source.Key, offset, limit),
					)

					return
				}
				for _, obj := range objs {
					results <- obj
				}
			}
		}()

		return results, errs
	}
}