// Copyright 2018 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package main

import (
	"time"

	"github.com/google/syzkaller/pkg/hash"
	"github.com/google/syzkaller/pkg/log"
	"github.com/google/syzkaller/pkg/mgrconfig"
	"github.com/google/syzkaller/pkg/report"
	"github.com/google/syzkaller/pkg/rpctype"
	"github.com/google/syzkaller/prog"
)

func (mgr *Manager) hubSyncLoop() {
	hc := &HubConnector{
		mgr:           mgr,
		cfg:           mgr.cfg,
		target:        mgr.target,
		stats:         mgr.stats,
		enabledCalls:  mgr.checkResult.EnabledCalls[mgr.cfg.Sandbox],
		fresh:         mgr.fresh,
		hubReproQueue: mgr.hubReproQueue,
	}
	if mgr.cfg.Reproduce && mgr.dash != nil {
		hc.needMoreRepros = mgr.needMoreRepros
	}
	hc.loop()
}

type HubConnector struct {
	mgr            HubManagerView
	cfg            *mgrconfig.Config
	target         *prog.Target
	stats          *Stats
	enabledCalls   []int
	fresh          bool
	hubCorpus      map[hash.Sig]bool
	newRepros      [][]byte
	hubReproQueue  chan *Crash
	needMoreRepros chan chan bool
}

// HubManagerView restricts interface between HubConnector and Manager.
type HubManagerView interface {
	getMinimizedCorpus() (corpus, repros [][]byte)
	addNewCandidates(progs [][]byte)
}

func (hc *HubConnector) loop() {
	var hub *rpctype.RPCClient
	for {
		time.Sleep(time.Minute)
		corpus, repros := hc.mgr.getMinimizedCorpus()
		hc.newRepros = append(hc.newRepros, repros...)
		if hub == nil {
			var err error
			if hub, err = hc.connect(corpus); err != nil {
				log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err)
				continue
			}
			log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus))
		}
		if err := hc.sync(hub, corpus); err != nil {
			log.Logf(0, "hub sync failed: %v", err)
			hub.Close()
			hub = nil
		}
	}
}

func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
	a := &rpctype.HubConnectArgs{
		Client:  hc.cfg.HubClient,
		Key:     hc.cfg.HubKey,
		Manager: hc.cfg.Name,
		Fresh:   hc.fresh,
	}
	for _, id := range hc.enabledCalls {
		a.Calls = append(a.Calls, hc.target.Syscalls[id].Name)
	}
	hubCorpus := make(map[hash.Sig]bool)
	for _, inp := range corpus {
		hubCorpus[hash.Hash(inp)] = true
		a.Corpus = append(a.Corpus, inp)
	}
	// Hub.Connect request can be very large, so do it on a transient connection
	// (rpc connection buffers never shrink).
	if err := rpctype.RPCCall(hc.cfg.HubAddr, "Hub.Connect", a, nil); err != nil {
		return nil, err
	}
	hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr)
	if err != nil {
		return nil, err
	}
	hc.hubCorpus = hubCorpus
	hc.fresh = false
	return hub, nil
}

func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error {
	a := &rpctype.HubSyncArgs{
		Client:  hc.cfg.HubClient,
		Key:     hc.cfg.HubKey,
		Manager: hc.cfg.Name,
	}
	sigs := make(map[hash.Sig]bool)
	for _, inp := range corpus {
		sig := hash.Hash(inp)
		sigs[sig] = true
		if hc.hubCorpus[sig] {
			continue
		}
		hc.hubCorpus[sig] = true
		a.Add = append(a.Add, inp)
	}
	for sig := range hc.hubCorpus {
		if sigs[sig] {
			continue
		}
		delete(hc.hubCorpus, sig)
		a.Del = append(a.Del, sig.String())
	}
	if hc.needMoreRepros != nil {
		needReproReply := make(chan bool)
		hc.needMoreRepros <- needReproReply
		a.NeedRepros = <-needReproReply
	}
	a.Repros = hc.newRepros
	for {
		r := new(rpctype.HubSyncRes)
		if err := hub.Call("Hub.Sync", a, r); err != nil {
			return err
		}
		progDropped := hc.processProgs(r.Progs)
		reproDropped := hc.processRepros(r.Repros)
		hc.stats.hubSendProgAdd.add(len(a.Add))
		hc.stats.hubSendProgDel.add(len(a.Del))
		hc.stats.hubSendRepro.add(len(a.Repros))
		hc.stats.hubRecvProg.add(len(r.Progs) - progDropped)
		hc.stats.hubRecvProgDrop.add(progDropped)
		hc.stats.hubRecvRepro.add(len(r.Repros) - reproDropped)
		hc.stats.hubRecvReproDrop.add(reproDropped)
		log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+
			" recv: progs %v, repros %v; more %v",
			len(a.Add), len(a.Del), len(a.Repros),
			len(r.Progs)-progDropped, len(r.Repros)-reproDropped, r.More)
		a.Add = nil
		a.Del = nil
		a.Repros = nil
		a.NeedRepros = false
		hc.newRepros = nil
		if len(r.Progs)+r.More == 0 {
			return nil
		}
	}
}

func (hc *HubConnector) processProgs(progs [][]byte) int {
	dropped := 0
	candidates := make([][]byte, 0, len(progs))
	for _, inp := range progs {
		if _, err := hc.target.Deserialize(inp); err != nil {
			dropped++
			continue
		}
		candidates = append(candidates, inp)
	}
	hc.mgr.addNewCandidates(candidates)
	return dropped
}

func (hc *HubConnector) processRepros(repros [][]byte) int {
	dropped := 0
	for _, repro := range repros {
		if _, err := hc.target.Deserialize(repro); err != nil {
			dropped++
			continue
		}
		hc.hubReproQueue <- &Crash{
			vmIndex: -1,
			hub:     true,
			Report: &report.Report{
				Title:  "external repro",
				Output: repro,
			},
		}
	}
	return dropped
}
