blob: 7b8d723bd6c0128d3e87bc2bad9033d5694b0400 [file] [log] [blame]
package main
import (
"container/heap"
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"time"
)
type Job struct {
n *DepNode
ex *Executor
parents []*Job
outputTs int64
numDeps int
depsTs int64
id int
runners []runner
}
type runner struct {
output string
cmd string
echo bool
ignoreError bool
shell string
}
type JobResult struct {
j *Job
w *Worker
}
type NewDep struct {
j *Job
neededBy *Job
}
type Worker struct {
wm *WorkerManager
jobChan chan *Job
waitChan chan bool
doneChan chan bool
}
type JobQueue []*Job
func (jq JobQueue) Len() int { return len(jq) }
func (jq JobQueue) Less(i, j int) bool {
// First come, first serve, for GNU make compatibility.
return jq[i].id < jq[j].id
}
func (jq JobQueue) Swap(i, j int) {
jq[i], jq[j] = jq[j], jq[i]
}
func (jq *JobQueue) Push(x interface{}) {
item := x.(*Job)
*jq = append(*jq, item)
}
func (jq *JobQueue) Pop() interface{} {
old := *jq
n := len(old)
item := old[n-1]
*jq = old[0 : n-1]
return item
}
func NewWorker(wm *WorkerManager) *Worker {
w := &Worker{
wm: wm,
jobChan: make(chan *Job),
waitChan: make(chan bool),
doneChan: make(chan bool),
}
return w
}
func (w *Worker) Run() {
done := false
for !done {
select {
case j := <-w.jobChan:
j.build()
w.wm.ReportResult(w, j)
case done = <-w.waitChan:
}
}
w.doneChan <- true
}
func (w *Worker) PostJob(j *Job) {
w.jobChan <- j
}
func (w *Worker) Wait() {
w.waitChan <- true
<-w.doneChan
}
func evalCmd(ev *Evaluator, r runner, s string) []runner {
r = newRunner(r, s)
if strings.IndexByte(r.cmd, '$') < 0 {
// fast path
return []runner{r}
}
// TODO(ukai): parse once more earlier?
expr, _, err := parseExpr([]byte(r.cmd), nil)
if err != nil {
panic(fmt.Errorf("parse cmd %q: %v", r.cmd, err))
}
buf := newBuf()
expr.Eval(buf, ev)
cmds := buf.String()
freeBuf(buf)
var runners []runner
for _, cmd := range strings.Split(cmds, "\n") {
if len(runners) > 0 && strings.HasSuffix(runners[len(runners)-1].cmd, "\\") {
runners[len(runners)-1].cmd += "\n"
runners[len(runners)-1].cmd += cmd
} else {
runners = append(runners, newRunner(r, cmd))
}
}
return runners
}
func newRunner(r runner, s string) runner {
for {
s = trimLeftSpace(s)
if s == "" {
return runner{}
}
switch s[0] {
case '@':
if !dryRunFlag {
r.echo = false
}
s = s[1:]
continue
case '-':
r.ignoreError = true
s = s[1:]
continue
}
break
}
r.cmd = s
return r
}
func (r runner) run(output string) error {
if r.echo || dryRunFlag {
fmt.Printf("%s\n", r.cmd)
}
if dryRunFlag {
return nil
}
args := []string{r.shell, "-c", r.cmd}
cmd := exec.Cmd{
Path: args[0],
Args: args,
}
out, err := cmd.CombinedOutput()
fmt.Printf("%s", out)
exit := exitStatus(err)
if r.ignoreError && exit != 0 {
fmt.Printf("[%s] Error %d (ignored)\n", output, exit)
err = nil
}
return err
}
func (j Job) createRunners() []runner {
runners, _ := j.ex.createRunners(j.n, false)
return runners
}
// TODO(ukai): use time.Time?
func getTimestamp(filename string) int64 {
st, err := os.Stat(filename)
if err != nil {
return -2
}
return st.ModTime().Unix()
}
func (j Job) build() {
if j.n.IsPhony {
j.outputTs = -2 // trigger cmd even if all inputs don't exist.
} else {
j.outputTs = getTimestamp(j.n.Output)
}
if !j.n.HasRule {
if j.outputTs >= 0 || j.n.IsPhony {
return
}
if len(j.parents) == 0 {
ErrorNoLocation("*** No rule to make target %q.", j.n.Output)
} else {
ErrorNoLocation("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output)
}
ErrorNoLocation("no rule to make target %q", j.n.Output)
}
if j.outputTs >= j.depsTs {
// TODO: stats.
return
}
for _, r := range j.createRunners() {
err := r.run(j.n.Output)
if err != nil {
exit := exitStatus(err)
ErrorNoLocation("[%s] Error %d: %v", j.n.Output, exit, err)
}
}
if j.n.IsPhony {
j.outputTs = time.Now().Unix()
} else {
j.outputTs = getTimestamp(j.n.Output)
if j.outputTs < 0 {
j.outputTs = time.Now().Unix()
}
}
}
func (wm *WorkerManager) handleJobs() {
for {
if !useParaFlag && len(wm.freeWorkers) == 0 {
return
}
if wm.readyQueue.Len() == 0 {
return
}
j := heap.Pop(&wm.readyQueue).(*Job)
Log("run: %s", j.n.Output)
if useParaFlag {
j.runners = j.createRunners()
if len(j.runners) == 0 {
wm.updateParents(j)
wm.finishCnt++
} else {
wm.runnings[j.n.Output] = j
wm.para.RunCommand(j.runners)
}
} else {
j.numDeps = -1 // Do not let other workers pick this.
w := wm.freeWorkers[0]
wm.freeWorkers = wm.freeWorkers[1:]
wm.busyWorkers[w] = true
w.jobChan <- j
}
}
}
func (wm *WorkerManager) updateParents(j *Job) {
for _, p := range j.parents {
p.numDeps--
Log("child: %s (%d)", p.n.Output, p.numDeps)
if p.depsTs < j.outputTs {
p.depsTs = j.outputTs
}
wm.maybePushToReadyQueue(p)
}
}
type WorkerManager struct {
jobs []*Job
readyQueue JobQueue
jobChan chan *Job
resultChan chan JobResult
newDepChan chan NewDep
waitChan chan bool
doneChan chan bool
freeWorkers []*Worker
busyWorkers map[*Worker]bool
ex *Executor
para *ParaWorker
paraChan chan *ParaResult
runnings map[string]*Job
finishCnt int
}
func NewWorkerManager() *WorkerManager {
wm := &WorkerManager{
jobChan: make(chan *Job),
resultChan: make(chan JobResult),
newDepChan: make(chan NewDep),
waitChan: make(chan bool),
doneChan: make(chan bool),
busyWorkers: make(map[*Worker]bool),
}
if useParaFlag {
wm.runnings = make(map[string]*Job)
wm.paraChan = make(chan *ParaResult)
wm.para = NewParaWorker(wm.paraChan)
go wm.para.Run()
} else {
wm.busyWorkers = make(map[*Worker]bool)
for i := 0; i < jobsFlag; i++ {
w := NewWorker(wm)
wm.freeWorkers = append(wm.freeWorkers, w)
go w.Run()
}
}
heap.Init(&wm.readyQueue)
go wm.Run()
return wm
}
func exitStatus(err error) int {
if err == nil {
return 0
}
exit := 1
if err, ok := err.(*exec.ExitError); ok {
if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok {
return w.ExitStatus()
}
}
return exit
}
func (wm *WorkerManager) hasTodo() bool {
return wm.finishCnt != len(wm.jobs)
}
func (wm *WorkerManager) maybePushToReadyQueue(j *Job) {
if j.numDeps != 0 {
return
}
heap.Push(&wm.readyQueue, j)
Log("ready: %s", j.n.Output)
}
func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) {
if j.numDeps < 0 {
neededBy.numDeps--
if neededBy.id > 0 {
panic("already in WM... can this happen?")
wm.maybePushToReadyQueue(neededBy)
}
} else {
j.parents = append(j.parents, neededBy)
}
}
func (wm *WorkerManager) Run() {
done := false
for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
select {
case j := <-wm.jobChan:
Log("wait: %s (%d)", j.n.Output, j.numDeps)
j.id = len(wm.jobs) + 1
wm.jobs = append(wm.jobs, j)
wm.maybePushToReadyQueue(j)
case jr := <-wm.resultChan:
Log("done: %s", jr.j.n.Output)
delete(wm.busyWorkers, jr.w)
wm.freeWorkers = append(wm.freeWorkers, jr.w)
wm.updateParents(jr.j)
wm.finishCnt++
case af := <-wm.newDepChan:
wm.handleNewDep(af.j, af.neededBy)
Log("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output)
case pr := <-wm.paraChan:
if pr.status < 0 && pr.signal < 0 {
j := wm.runnings[pr.output]
for _, r := range j.runners {
if r.echo || dryRunFlag {
fmt.Printf("%s\n", r.cmd)
}
}
} else {
os.Stdout.Write([]byte(pr.stdout))
os.Stderr.Write([]byte(pr.stderr))
j := wm.runnings[pr.output]
wm.updateParents(j)
delete(wm.runnings, pr.output)
wm.finishCnt++
}
case done = <-wm.waitChan:
}
wm.handleJobs()
if useParaFlag {
numBusy := len(wm.runnings)
if numBusy > jobsFlag {
numBusy = jobsFlag
}
Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), jobsFlag-numBusy, numBusy)
} else {
Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
}
}
if useParaFlag {
Log("Wait for para to finish")
wm.para.Wait()
} else {
for _, w := range wm.freeWorkers {
w.Wait()
}
for w := range wm.busyWorkers {
w.Wait()
}
}
wm.doneChan <- true
}
func (wm *WorkerManager) PostJob(j *Job) {
wm.jobChan <- j
}
func (wm *WorkerManager) ReportResult(w *Worker, j *Job) {
wm.resultChan <- JobResult{w: w, j: j}
}
func (wm *WorkerManager) ReportNewDep(j *Job, neededBy *Job) {
wm.newDepChan <- NewDep{j: j, neededBy: neededBy}
}
func (wm *WorkerManager) Wait() {
wm.waitChan <- true
<-wm.doneChan
}