all repos — homestead @ 445c43ef8b981bfacc8b465b3997da413370043f

Code for my website

use redis notifications to update website

Alan Pearce
commit

445c43ef8b981bfacc8b465b3997da413370043f

parent

fd0f99dffb49f29531d1974a638c0ce883bf5db7

1 file changed, 218 insertions(+), 0 deletions(-)

changed files
A internal/fetcher/fetcher.go
@@ -0,0 +1,218 @@
+package fetcher + +import ( + "context" + "io" + "io/fs" + "net/http" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" + + "github.com/google/renameio/v2" + "gitlab.com/tozd/go/errors" + "go.alanpearce.eu/homestead/internal/config" + "go.alanpearce.eu/homestead/internal/events" + "go.alanpearce.eu/x/log" +) + +var files = []string{"config.toml", "site.db"} +var numericFilename = regexp.MustCompile("[0-9]{3,}") +var timeout = 10 * time.Second + +type Fetcher struct { + options *Options + log *log.Logger + updater events.Listener +} + +type Options struct { + Root string + RedisEnabled bool + FetchURL config.URL + Listener events.Listener +} + +func New(log *log.Logger, options *Options) Fetcher { + return Fetcher{ + log: log, + options: options, + updater: options.Listener, + } +} + +func (f *Fetcher) getArtefacts(run uint64) error { + runID := strconv.FormatUint(run, 10) + f.log.Debug("getting artefacts", "run_id", runID) + + err := os.MkdirAll(filepath.Join(f.options.Root, runID), 0755) + if err != nil { + return errors.WithMessage(err, "could not create directory") + } + + for _, file := range files { + err := f.getFile(runID, file) + if err != nil { + return errors.WithMessage(err, "could not fetch file") + } + } + + err = renameio.Symlink(runID, filepath.Join(f.options.Root, "current")) + if err != nil { + return errors.WithMessage(err, "could not create/update symlink") + } + + return nil +} + +func (f *Fetcher) checkFolder() error { + contents, err := os.ReadDir(f.options.Root) + if err != nil { + return errors.WithMessage(err, "could not read root directory") + } + var badFiles []string + for _, f := range contents { + name := f.Name() + if !(name == "current" || numericFilename.MatchString(name)) { + badFiles = append(badFiles, name) + } + } + + if len(badFiles) > 0 { + return errors.Basef("unexpected files in root directory: %s", strings.Join(badFiles, ", ")) + } + + return nil +} + +func (f *Fetcher) getFile(runID, basename string) error { + filename := filepath.Join(f.options.Root, runID, basename) + url := f.options.FetchURL.JoinPath(runID, basename).String() + + f.log.Debug("getting file", "filename", filename, "url", url) + + file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return errors.WithMessage(err, "could not open file") + } + defer file.Close() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return errors.WithMessage(err, "could not create request") + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return errors.WithMessage(err, "could not issue request") + } + + _, err = io.Copy(file, res.Body) + if err != nil { + return errors.WithMessage(err, "could not write file") + } + + err = file.Sync() + if err != nil { + return errors.WithMessage(err, "could not sync file") + } + + return nil +} + +func (f *Fetcher) getCurrentVersion() (uint64, error) { + target, err := os.Readlink(filepath.Join(f.options.Root, "current")) + if err != nil && errors.Is(err, fs.ErrNotExist) { + return 0, errors.WithMessage(err, "could not stat current link") + } + runID, err := strconv.ParseUint(target, 10, 64) + if err != nil { + return 0, errors.WithMessagef(err, "unexpected symlink target (current -> %s)", target) + } + + return runID, nil +} + +func (f *Fetcher) initialiseStorage() (uint64, error) { + latest, err := f.updater.GetLatestRunID() + if err != nil { + f.log.Warn("could not get latest run ID, using fallback", "error", err) + } + + current, err := f.getCurrentVersion() + if err != nil { + f.log.Warn("could not get current version", "error", err) + } + + f.log.Debug("versions", "current", current, "latest", latest) + + if latest > current { + err = f.getArtefacts(latest) + if err != nil { + return latest, errors.WithMessage(err, "could not fetch artefacts") + } + + return latest, nil + } + + return current, nil +} + +func (f *Fetcher) Subscribe() (<-chan string, error) { + ch := make(chan string, 1) + err := f.checkFolder() + if err != nil { + return nil, err + } + + var root string + if f.options.RedisEnabled { + root = f.path("current") + } else { + runID, err := f.initialiseStorage() + if err != nil { + return nil, err + } + root = f.path(strconv.FormatUint(runID, 10)) + } + updates, err := f.updater.Subscribe() + if err != nil { + return nil, errors.WithMessage(err, "could not subscribe to updates") + } + + go func() { + ch <- root + + for update := range updates { + if update.RunID == 0 { + if !f.options.RedisEnabled { + f.log.Warn("got zero runID") + + continue + } + + ch <- f.path("current") + } else { + err := f.getArtefacts(update.RunID) + if err != nil { + f.log.Warn("could not get artefacts for version", "run_id", update.RunID, "error", err) + + continue + } + + ch <- f.path(strconv.FormatUint(update.RunID, 10)) + } + } + }() + + return ch, nil +} + +func (f *Fetcher) path(runID string) string { + return filepath.Join(f.options.Root, runID) +}