blob: b662657fa8b5f10fc2d9f1b4a3f4720e0a91b88d [file] [log] [blame]
// Copyright 2020 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sort"
"time"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/image"
"google.golang.org/appengine/v2"
"google.golang.org/appengine/v2/log"
"google.golang.org/appengine/v2/memcache"
)
type Cached struct {
MissingBackports int
Total CachedBugStats
Subsystems map[string]CachedBugStats
NoSubsystem CachedBugStats
}
type CachedBugStats struct {
Open int
Fixed int
Invalid int
}
func CacheGet(c context.Context, r *http.Request, ns string) (*Cached, error) {
accessLevel := accessLevel(c, r)
v := new(Cached)
_, err := memcache.Gob.Get(c, cacheKey(ns, accessLevel), v)
if err != nil && err != memcache.ErrCacheMiss {
return nil, err
}
if err == nil {
return v, nil
}
bugs, _, err := loadNamespaceBugs(c, ns)
if err != nil {
return nil, err
}
backports, err := loadAllBackports(c)
if err != nil {
return nil, err
}
return buildAndStoreCached(c, bugs, backports, ns, accessLevel)
}
var cacheAccessLevels = []AccessLevel{AccessPublic, AccessUser, AccessAdmin}
// cacheUpdate updates memcache every hour (called by cron.yaml).
// Cache update is slow and we don't want to slow down user requests.
func cacheUpdate(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
backports, err := loadAllBackports(c)
if err != nil {
log.Errorf(c, "failed load backports: %v", err)
return
}
for ns := range getConfig(c).Namespaces {
bugs, _, err := loadNamespaceBugs(c, ns)
if err != nil {
log.Errorf(c, "failed load ns=%v bugs: %v", ns, err)
continue
}
for _, accessLevel := range cacheAccessLevels {
_, err := buildAndStoreCached(c, bugs, backports, ns, accessLevel)
if err != nil {
log.Errorf(c, "failed to build cached for ns=%v access=%v: %v", ns, accessLevel, err)
continue
}
}
}
}
func buildAndStoreCached(c context.Context, bugs []*Bug, backports []*rawBackport,
ns string, accessLevel AccessLevel) (*Cached, error) {
v := &Cached{
Subsystems: make(map[string]CachedBugStats),
}
for _, bug := range bugs {
if bug.Status == BugStatusOpen && accessLevel < bug.sanitizeAccess(c, accessLevel) {
continue
}
v.Total.Record(bug)
subsystems := bug.LabelValues(SubsystemLabel)
for _, label := range subsystems {
stats := v.Subsystems[label.Value]
stats.Record(bug)
v.Subsystems[label.Value] = stats
}
if len(subsystems) == 0 {
v.NoSubsystem.Record(bug)
}
}
for _, backport := range backports {
outgoing := stringInList(backport.FromNs, ns)
for _, bug := range backport.Bugs {
if accessLevel < bug.sanitizeAccess(c, accessLevel) {
continue
}
if bug.Namespace == ns || outgoing {
v.MissingBackports++
}
}
}
item := &memcache.Item{
Key: cacheKey(ns, accessLevel),
Object: v,
Expiration: 4 * time.Hour, // supposed to be updated by cron every hour
}
if err := memcache.Gob.Set(c, item); err != nil {
return nil, err
}
return v, nil
}
func (c *CachedBugStats) Record(bug *Bug) {
switch bug.Status {
case BugStatusOpen:
if len(bug.Commits) == 0 {
c.Open++
} else {
c.Fixed++
}
case BugStatusFixed:
c.Fixed++
case BugStatusInvalid:
c.Invalid++
}
}
func cacheKey(ns string, accessLevel AccessLevel) string {
return fmt.Sprintf("%v-%v", ns, accessLevel)
}
func CachedBugGroups(c context.Context, ns string, accessLevel AccessLevel) ([]*uiBugGroup, error) {
item, err := memcache.Get(c, cachedBugGroupsKey(ns, accessLevel))
if err == memcache.ErrCacheMiss {
return nil, nil
}
if err != nil {
return nil, err
}
jsonData, destructor := image.MustDecompress(item.Value)
defer destructor()
var ret []*uiBugGroup
err = json.Unmarshal(jsonData, &ret)
return ret, err
}
func cachedBugGroupsKey(ns string, accessLevel AccessLevel) string {
return fmt.Sprintf("%v-%v-bug-groups", ns, accessLevel)
}
// minuteCacheUpdate updates memcache every minute (called by cron.yaml).
func handleMinuteCacheUpdate(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
for ns, nsConfig := range getConfig(c).Namespaces {
if !nsConfig.CacheUIPages {
continue
}
err := minuteCacheNsUpdate(c, ns)
if err != nil {
http.Error(w, fmt.Sprintf("bug groups cache update for %s failed: %v", ns, err),
http.StatusInternalServerError)
return
}
}
}
func minuteCacheNsUpdate(c context.Context, ns string) error {
bugs, err := loadVisibleBugs(c, ns, nil)
if err != nil {
return err
}
managers, err := managerList(c, ns)
if err != nil {
return err
}
for _, accessLevel := range cacheAccessLevels {
groups, err := prepareBugGroups(c, bugs, managers, accessLevel, ns)
if err != nil {
return fmt.Errorf("failed to fetch groups: %w", err)
}
encoded, err := json.Marshal(groups)
if err != nil {
return fmt.Errorf("failed to marshal: %w", err)
}
item := &memcache.Item{
Key: cachedBugGroupsKey(ns, accessLevel),
// The resulting blob can be quite big, so let's compress.
Value: image.Compress(encoded),
Expiration: 2 * time.Minute, // supposed to be updated by cron every minute
}
if err := memcache.Set(c, item); err != nil {
return err
}
}
return nil
}
func CachedManagerList(c context.Context, ns string) ([]string, error) {
return cachedObjectList(c,
fmt.Sprintf("%s-managers-list", ns),
time.Minute,
func(c context.Context) ([]string, error) {
return managerList(c, ns)
},
)
}
func CachedUIManagers(c context.Context, accessLevel AccessLevel, ns string,
filter *userBugFilter) ([]*uiManager, error) {
return cachedObjectList(c,
fmt.Sprintf("%s-%v-%v-ui-managers", ns, accessLevel, filter.Hash()),
5*time.Minute,
func(c context.Context) ([]*uiManager, error) {
return loadManagers(c, accessLevel, ns, filter)
},
)
}
func cachedObjectList[T any](c context.Context, key string, period time.Duration,
load func(context.Context) ([]T, error)) ([]T, error) {
// Check if the object is in cache.
var obj []T
_, err := memcache.Gob.Get(c, key, &obj)
if err == nil {
return obj, nil
} else if err != memcache.ErrCacheMiss {
return nil, err
}
// Load the object.
obj, err = load(c)
if err != nil {
return nil, err
}
item := &memcache.Item{
Key: key,
Object: obj,
Expiration: period,
}
if err := memcache.Gob.Set(c, item); err != nil {
return nil, err
}
return obj, nil
}
type RequesterInfo struct {
Requests []time.Time
}
func (ri *RequesterInfo) Record(now time.Time, cfg ThrottleConfig) bool {
var newRequests []time.Time
for _, req := range ri.Requests {
if now.Sub(req) >= cfg.Window {
continue
}
newRequests = append(newRequests, req)
}
newRequests = append(newRequests, now)
sort.Slice(ri.Requests, func(i, j int) bool { return ri.Requests[i].Before(ri.Requests[j]) })
// Don't store more than needed.
if len(newRequests) > cfg.Limit+1 {
newRequests = newRequests[len(newRequests)-(cfg.Limit+1):]
}
ri.Requests = newRequests
// Check that we satisfy the conditions.
return len(newRequests) <= cfg.Limit
}
var ErrThrottleTooManyRetries = errors.New("all attempts to record request failed")
func ThrottleRequest(c context.Context, requesterID string) (bool, error) {
cfg := getConfig(c).Throttle
if cfg.Empty() || requesterID == "" {
// No sense to query memcached.
return true, nil
}
key := fmt.Sprintf("requester-%s", hash.String([]byte(requesterID)))
const attempts = 5
for i := 0; i < attempts; i++ {
var obj RequesterInfo
item, err := memcache.Gob.Get(c, key, &obj)
if err == memcache.ErrCacheMiss {
ok := obj.Record(timeNow(c), cfg)
err = memcache.Gob.Add(c, &memcache.Item{
Key: key,
Object: obj,
Expiration: cfg.Window,
})
if err == memcache.ErrNotStored {
// Conflict with another instance. Retry.
continue
}
return ok, err
} else if err != nil {
return false, err
}
// Update the existing object.
ok := obj.Record(timeNow(c), cfg)
item.Expiration = cfg.Window
item.Object = obj
err = memcache.Gob.CompareAndSwap(c, item)
if err == memcache.ErrCASConflict {
if ok {
// Only retry if we approved the query.
// If we denied and there was a concurrent write
// to the same object, it could have only denied
// the query as well.
// Our save won't change anything.
continue
}
} else if err != nil {
return false, err
}
return ok, nil
}
return false, ErrThrottleTooManyRetries
}