| // Copyright 2023 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package cache |
| |
| import ( |
| "bufio" |
| "cmd/go/internal/base" |
| "cmd/internal/quoted" |
| "context" |
| "crypto/sha256" |
| "encoding/base64" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "log" |
| "os" |
| "os/exec" |
| "sync" |
| "sync/atomic" |
| "time" |
| ) |
| |
| // ProgCache implements Cache via JSON messages over stdin/stdout to a child |
| // helper process which can then implement whatever caching policy/mechanism it |
| // wants. |
| // |
| // See https://github.com/golang/go/issues/59719 |
| type ProgCache struct { |
| cmd *exec.Cmd |
| stdout io.ReadCloser // from the child process |
| stdin io.WriteCloser // to the child process |
| bw *bufio.Writer // to stdin |
| jenc *json.Encoder // to bw |
| |
| // can are the commands that the child process declared that it supports. |
| // This is effectively the versioning mechanism. |
| can map[ProgCmd]bool |
| |
| // fuzzDirCache is another Cache implementation to use for the FuzzDir |
| // method. In practice this is the default GOCACHE disk-based |
| // implementation. |
| // |
| // TODO(bradfitz): maybe this isn't ideal. But we'd need to extend the Cache |
| // interface and the fuzzing callers to be less disk-y to do more here. |
| fuzzDirCache Cache |
| |
| closing atomic.Bool |
| ctx context.Context // valid until Close via ctxClose |
| ctxCancel context.CancelFunc // called on Close |
| readLoopDone chan struct{} // closed when readLoop returns |
| |
| mu sync.Mutex // guards following fields |
| nextID int64 |
| inFlight map[int64]chan<- *ProgResponse |
| outputFile map[OutputID]string // object => abs path on disk |
| |
| // writeMu serializes writing to the child process. |
| // It must never be held at the same time as mu. |
| writeMu sync.Mutex |
| } |
| |
| // ProgCmd is a command that can be issued to a child process. |
| // |
| // If the interface needs to grow, we can add new commands or new versioned |
| // commands like "get2". |
| type ProgCmd string |
| |
| const ( |
| cmdGet = ProgCmd("get") |
| cmdPut = ProgCmd("put") |
| cmdClose = ProgCmd("close") |
| ) |
| |
| // ProgRequest is the JSON-encoded message that's sent from cmd/go to |
| // the GOCACHEPROG child process over stdin. Each JSON object is on its |
| // own line. A ProgRequest of Type "put" with BodySize > 0 will be followed |
| // by a line containing a base64-encoded JSON string literal of the body. |
| type ProgRequest struct { |
| // ID is a unique number per process across all requests. |
| // It must be echoed in the ProgResponse from the child. |
| ID int64 |
| |
| // Command is the type of request. |
| // The cmd/go tool will only send commands that were declared |
| // as supported by the child. |
| Command ProgCmd |
| |
| // ActionID is non-nil for get and puts. |
| ActionID []byte `json:",omitempty"` // or nil if not used |
| |
| // ObjectID is set for Type "put" and "output-file". |
| ObjectID []byte `json:",omitempty"` // or nil if not used |
| |
| // Body is the body for "put" requests. It's sent after the JSON object |
| // as a base64-encoded JSON string when BodySize is non-zero. |
| // It's sent as a separate JSON value instead of being a struct field |
| // send in this JSON object so large values can be streamed in both directions. |
| // The base64 string body of a ProgRequest will always be written |
| // immediately after the JSON object and a newline. |
| Body io.Reader `json:"-"` |
| |
| // BodySize is the number of bytes of Body. If zero, the body isn't written. |
| BodySize int64 `json:",omitempty"` |
| } |
| |
| // ProgResponse is the JSON response from the child process to cmd/go. |
| // |
| // With the exception of the first protocol message that the child writes to its |
| // stdout with ID==0 and KnownCommands populated, these are only sent in |
| // response to a ProgRequest from cmd/go. |
| // |
| // ProgResponses can be sent in any order. The ID must match the request they're |
| // replying to. |
| type ProgResponse struct { |
| ID int64 // that corresponds to ProgRequest; they can be answered out of order |
| Err string `json:",omitempty"` // if non-empty, the error |
| |
| // KnownCommands is included in the first message that cache helper program |
| // writes to stdout on startup (with ID==0). It includes the |
| // ProgRequest.Command types that are supported by the program. |
| // |
| // This lets us extend the protocol gracefully over time (adding "get2", |
| // etc), or fail gracefully when needed. It also lets us verify the program |
| // wants to be a cache helper. |
| KnownCommands []ProgCmd `json:",omitempty"` |
| |
| // For Get requests. |
| |
| Miss bool `json:",omitempty"` // cache miss |
| OutputID []byte `json:",omitempty"` |
| Size int64 `json:",omitempty"` // in bytes |
| Time *time.Time `json:",omitempty"` // an Entry.Time; when the object was added to the docs |
| |
| // DiskPath is the absolute path on disk of the ObjectID corresponding |
| // a "get" request's ActionID (on cache hit) or a "put" request's |
| // provided ObjectID. |
| DiskPath string `json:",omitempty"` |
| } |
| |
| // startCacheProg starts the prog binary (with optional space-separated flags) |
| // and returns a Cache implementation that talks to it. |
| // |
| // It blocks a few seconds to wait for the child process to successfully start |
| // and advertise its capabilities. |
| func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache { |
| if fuzzDirCache == nil { |
| panic("missing fuzzDirCache") |
| } |
| args, err := quoted.Split(progAndArgs) |
| if err != nil { |
| base.Fatalf("GOCACHEPROG args: %v", err) |
| } |
| var prog string |
| if len(args) > 0 { |
| prog = args[0] |
| args = args[1:] |
| } |
| |
| ctx, ctxCancel := context.WithCancel(context.Background()) |
| |
| cmd := exec.CommandContext(ctx, prog, args...) |
| out, err := cmd.StdoutPipe() |
| if err != nil { |
| base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err) |
| } |
| in, err := cmd.StdinPipe() |
| if err != nil { |
| base.Fatalf("StdinPipe to GOCACHEPROG: %v", err) |
| } |
| cmd.Stderr = os.Stderr |
| cmd.Cancel = in.Close |
| |
| if err := cmd.Start(); err != nil { |
| base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err) |
| } |
| |
| pc := &ProgCache{ |
| ctx: ctx, |
| ctxCancel: ctxCancel, |
| fuzzDirCache: fuzzDirCache, |
| cmd: cmd, |
| stdout: out, |
| stdin: in, |
| bw: bufio.NewWriter(in), |
| inFlight: make(map[int64]chan<- *ProgResponse), |
| outputFile: make(map[OutputID]string), |
| readLoopDone: make(chan struct{}), |
| } |
| |
| // Register our interest in the initial protocol message from the child to |
| // us, saying what it can do. |
| capResc := make(chan *ProgResponse, 1) |
| pc.inFlight[0] = capResc |
| |
| pc.jenc = json.NewEncoder(pc.bw) |
| go pc.readLoop(pc.readLoopDone) |
| |
| // Give the child process a few seconds to report its capabilities. This |
| // should be instant and not require any slow work by the program. |
| timer := time.NewTicker(5 * time.Second) |
| defer timer.Stop() |
| for { |
| select { |
| case <-timer.C: |
| log.Printf("# still waiting for GOCACHEPROG %v ...", prog) |
| case capRes := <-capResc: |
| can := map[ProgCmd]bool{} |
| for _, cmd := range capRes.KnownCommands { |
| can[cmd] = true |
| } |
| if len(can) == 0 { |
| base.Fatalf("GOCACHEPROG %v declared no supported commands", prog) |
| } |
| pc.can = can |
| return pc |
| } |
| } |
| } |
| |
| func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) { |
| defer close(readLoopDone) |
| jd := json.NewDecoder(c.stdout) |
| for { |
| res := new(ProgResponse) |
| if err := jd.Decode(res); err != nil { |
| if c.closing.Load() { |
| return // quietly |
| } |
| if err == io.EOF { |
| c.mu.Lock() |
| inFlight := len(c.inFlight) |
| c.mu.Unlock() |
| base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight) |
| } |
| base.Fatalf("error reading JSON from GOCACHEPROG: %v", err) |
| } |
| c.mu.Lock() |
| ch, ok := c.inFlight[res.ID] |
| delete(c.inFlight, res.ID) |
| c.mu.Unlock() |
| if ok { |
| ch <- res |
| } else { |
| base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID) |
| } |
| } |
| } |
| |
| func (c *ProgCache) send(ctx context.Context, req *ProgRequest) (*ProgResponse, error) { |
| resc := make(chan *ProgResponse, 1) |
| if err := c.writeToChild(req, resc); err != nil { |
| return nil, err |
| } |
| select { |
| case res := <-resc: |
| if res.Err != "" { |
| return nil, errors.New(res.Err) |
| } |
| return res, nil |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| } |
| } |
| |
| func (c *ProgCache) writeToChild(req *ProgRequest, resc chan<- *ProgResponse) (err error) { |
| c.mu.Lock() |
| c.nextID++ |
| req.ID = c.nextID |
| c.inFlight[req.ID] = resc |
| c.mu.Unlock() |
| |
| defer func() { |
| if err != nil { |
| c.mu.Lock() |
| delete(c.inFlight, req.ID) |
| c.mu.Unlock() |
| } |
| }() |
| |
| c.writeMu.Lock() |
| defer c.writeMu.Unlock() |
| |
| if err := c.jenc.Encode(req); err != nil { |
| return err |
| } |
| if err := c.bw.WriteByte('\n'); err != nil { |
| return err |
| } |
| if req.Body != nil && req.BodySize > 0 { |
| if err := c.bw.WriteByte('"'); err != nil { |
| return err |
| } |
| e := base64.NewEncoder(base64.StdEncoding, c.bw) |
| wrote, err := io.Copy(e, req.Body) |
| if err != nil { |
| return err |
| } |
| if err := e.Close(); err != nil { |
| return nil |
| } |
| if wrote != req.BodySize { |
| return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, object %x: wrote %v; expected %v", |
| req.ActionID, req.ObjectID, wrote, req.BodySize) |
| } |
| if _, err := c.bw.WriteString("\"\n"); err != nil { |
| return err |
| } |
| } |
| if err := c.bw.Flush(); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (c *ProgCache) Get(a ActionID) (Entry, error) { |
| if !c.can[cmdGet] { |
| // They can't do a "get". Maybe they're a write-only cache. |
| // |
| // TODO(bradfitz,bcmills): figure out the proper error type here. Maybe |
| // errors.ErrUnsupported? Is entryNotFoundError even appropriate? There |
| // might be places where we rely on the fact that a recent Put can be |
| // read through a corresponding Get. Audit callers and check, and document |
| // error types on the Cache interface. |
| return Entry{}, &entryNotFoundError{} |
| } |
| res, err := c.send(c.ctx, &ProgRequest{ |
| Command: cmdGet, |
| ActionID: a[:], |
| }) |
| if err != nil { |
| return Entry{}, err // TODO(bradfitz): or entryNotFoundError? Audit callers. |
| } |
| if res.Miss { |
| return Entry{}, &entryNotFoundError{} |
| } |
| e := Entry{ |
| Size: res.Size, |
| } |
| if res.Time != nil { |
| e.Time = *res.Time |
| } else { |
| e.Time = time.Now() |
| } |
| if res.DiskPath == "" { |
| return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")} |
| } |
| if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) { |
| return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")} |
| } |
| c.noteOutputFile(e.OutputID, res.DiskPath) |
| return e, nil |
| } |
| |
| func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| c.outputFile[o] = diskPath |
| } |
| |
| func (c *ProgCache) OutputFile(o OutputID) string { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| return c.outputFile[o] |
| } |
| |
| func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) { |
| // Compute output ID. |
| h := sha256.New() |
| if _, err := file.Seek(0, 0); err != nil { |
| return OutputID{}, 0, err |
| } |
| size, err := io.Copy(h, file) |
| if err != nil { |
| return OutputID{}, 0, err |
| } |
| var out OutputID |
| h.Sum(out[:0]) |
| |
| if _, err := file.Seek(0, 0); err != nil { |
| return OutputID{}, 0, err |
| } |
| |
| if !c.can[cmdPut] { |
| // Child is a read-only cache. Do nothing. |
| return out, size, nil |
| } |
| |
| res, err := c.send(c.ctx, &ProgRequest{ |
| Command: cmdPut, |
| ActionID: a[:], |
| ObjectID: out[:], |
| Body: file, |
| BodySize: size, |
| }) |
| if err != nil { |
| return OutputID{}, 0, err |
| } |
| if res.DiskPath == "" { |
| return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response") |
| } |
| c.noteOutputFile(out, res.DiskPath) |
| return out, size, err |
| } |
| |
| func (c *ProgCache) Close() error { |
| c.closing.Store(true) |
| var err error |
| |
| // First write a "close" message to the child so it can exit nicely |
| // and clean up if it wants. Only after that exchange do we cancel |
| // the context that kills the process. |
| if c.can[cmdClose] { |
| _, err = c.send(c.ctx, &ProgRequest{Command: cmdClose}) |
| } |
| c.ctxCancel() |
| <-c.readLoopDone |
| return err |
| } |
| |
| func (c *ProgCache) FuzzDir() string { |
| // TODO(bradfitz): figure out what to do here. For now just use the |
| // disk-based default. |
| return c.fuzzDirCache.FuzzDir() |
| } |