| /* |
| * Copyright 2018 The Kythe Authors. All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package pipeline |
| |
| import ( |
| "fmt" |
| "log" |
| "reflect" |
| "sort" |
| "strconv" |
| |
| "kythe.io/kythe/go/serving/pipeline/nodes" |
| "kythe.io/kythe/go/serving/xrefs/assemble" |
| "kythe.io/kythe/go/util/compare" |
| "kythe.io/kythe/go/util/kytheuri" |
| "kythe.io/kythe/go/util/schema" |
| "kythe.io/kythe/go/util/schema/edges" |
| "kythe.io/kythe/go/util/schema/facts" |
| kinds "kythe.io/kythe/go/util/schema/nodes" |
| "kythe.io/kythe/go/util/span" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam" |
| "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" |
| "github.com/golang/protobuf/proto" |
| |
| cpb "kythe.io/kythe/proto/common_go_proto" |
| gspb "kythe.io/kythe/proto/graph_serving_go_proto" |
| ppb "kythe.io/kythe/proto/pipeline_go_proto" |
| scpb "kythe.io/kythe/proto/schema_go_proto" |
| srvpb "kythe.io/kythe/proto/serving_go_proto" |
| spb "kythe.io/kythe/proto/storage_go_proto" |
| xspb "kythe.io/kythe/proto/xref_serving_go_proto" |
| ) |
| |
| func init() { |
| beam.RegisterFunction(bareRevEdge) |
| beam.RegisterFunction(callEdge) |
| beam.RegisterFunction(combineEdgesIndex) |
| beam.RegisterFunction(completeDocument) |
| beam.RegisterFunction(constructCaller) |
| beam.RegisterFunction(defToDecorPiece) |
| beam.RegisterFunction(diagToDecor) |
| beam.RegisterFunction(edgeTargets) |
| beam.RegisterFunction(edgeToCrossRefRelation) |
| beam.RegisterFunction(emitRelatedDefs) |
| beam.RegisterFunction(fileToDecorPiece) |
| beam.RegisterFunction(fileToTags) |
| beam.RegisterFunction(filterAnchorNodes) |
| beam.RegisterFunction(groupCrossRefs) |
| beam.RegisterFunction(groupEdges) |
| beam.RegisterFunction(keyByPath) |
| beam.RegisterFunction(keyCrossRef) |
| beam.RegisterFunction(keyNode) |
| beam.RegisterFunction(keyRef) |
| beam.RegisterFunction(moveSourceToKey) |
| beam.RegisterFunction(nodeToChildren) |
| beam.RegisterFunction(nodeToDecorPiece) |
| beam.RegisterFunction(nodeToDiagnostic) |
| beam.RegisterFunction(nodeToDocs) |
| beam.RegisterFunction(nodeToEdges) |
| beam.RegisterFunction(nodeToReverseEdges) |
| beam.RegisterFunction(parseMarkedSource) |
| beam.RegisterFunction(refToCallsite) |
| beam.RegisterFunction(refToCrossRef) |
| beam.RegisterFunction(refToDecorPiece) |
| beam.RegisterFunction(refToTag) |
| beam.RegisterFunction(reverseEdge) |
| beam.RegisterFunction(splitEdge) |
| beam.RegisterFunction(targetToFile) |
| beam.RegisterFunction(toDefinition) |
| beam.RegisterFunction(toFiles) |
| beam.RegisterFunction(toRefs) |
| |
| beam.RegisterType(reflect.TypeOf((*combineDecorPieces)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*ticketKey)(nil)).Elem()) |
| |
| beam.RegisterType(reflect.TypeOf((*cpb.Diagnostic)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*cpb.MarkedSource)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*ppb.DecorationPiece)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*ppb.Reference)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*scpb.Edge)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*scpb.Node)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*spb.Entry)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*spb.VName)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.CorpusRoots)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.Document)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.EdgePage)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.ExpandedAnchor)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.File)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.FileDecorations)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.FileDirectory)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.PagedCrossReferences)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.PagedCrossReferences_Page)(nil)).Elem()) |
| beam.RegisterType(reflect.TypeOf((*srvpb.PagedEdgeSet)(nil)).Elem()) |
| } |
| |
| // KytheBeam controls the lifetime and generation of PCollections in the Kythe |
| // pipeline. |
| type KytheBeam struct { |
| s beam.Scope |
| |
| fileVNames beam.PCollection // *spb.VName |
| nodes beam.PCollection // *scpb.Node |
| files beam.PCollection // *srvpb.File |
| refs beam.PCollection // *ppb.Reference |
| edges beam.PCollection // *gspb.Edges |
| |
| markedSources beam.PCollection // KV<*spb.VName, *cpb.MarkedSource> |
| |
| anchorBuildConfigs beam.PCollection // KV<*spb.VName, string> |
| } |
| |
| // FromNodes creates a KytheBeam pipeline from an input collection of |
| // *spb.Nodes. |
| func FromNodes(s beam.Scope, nodes beam.PCollection) *KytheBeam { return &KytheBeam{s: s, nodes: nodes} } |
| |
| // FromEntries creates a KytheBeam pipeline from an input collection of |
| // *spb.Entry messages. |
| func FromEntries(s beam.Scope, entries beam.PCollection) *KytheBeam { |
| return FromNodes(s, nodes.FromEntries(s, entries)) |
| } |
| |
| func keyNode(n *scpb.Node) (*spb.VName, *scpb.Node) { return n.Source, n } |
| |
| // SplitCrossReferences returns a columnar Kythe cross-references table derived |
| // from the Kythe input graph. The beam.PCollection has elements of type |
| // KV<[]byte, []byte>. |
| func (k *KytheBeam) SplitCrossReferences() beam.PCollection { |
| s := k.s.Scope("SplitCrossReferences") |
| |
| refs := beam.ParDo(s, refToCrossRef, k.References()) |
| idx := beam.ParDo(s, nodeToCrossRef, beam.CoGroupByKey(s, |
| beam.ParDo(s, keyNode, k.Nodes()), |
| k.getMarkedSources(), |
| // TODO(schroederc): merge_with |
| )) |
| |
| callgraph := k.callGraph() |
| |
| edges := k.edgeRelations() |
| relatedDefs := beam.ParDo(s, emitRelatedDefs, beam.CoGroupByKey(s, |
| k.directDefinitions(), |
| beam.ParDo(s, splitEdge, filter.Distinct(s, beam.ParDo(s, bareRevEdge, edges))), |
| )) |
| relations := beam.ParDo(s, edgeToCrossRefRelation, edges) |
| |
| return beam.ParDo(s, encodeCrossRef, beam.Flatten(s, |
| idx, |
| refs, |
| relations, |
| relatedDefs, |
| callgraph, |
| )) |
| } |
| |
| func (k *KytheBeam) callGraph() beam.PCollection { |
| s := k.s.Scope("CallGraph") |
| callsites := beam.ParDo(s, refToCallsite, k.References()) |
| // TODO(schroederc): override callers |
| callers := beam.ParDo(s, constructCaller, beam.CoGroupByKey(s, |
| k.directDefinitions(), |
| k.getMarkedSources(), |
| beam.ParDo(s, splitEdge, filter.Distinct(s, beam.ParDo(s, callEdge, callsites))), |
| )) |
| return beam.Flatten(s, callsites, callers) |
| } |
| |
| func emitRelatedDefs(target *spb.VName, defStream func(**srvpb.ExpandedAnchor) bool, srcStream func(**spb.VName) bool, emit func(*xspb.CrossReferences)) { |
| var def *srvpb.ExpandedAnchor |
| if !defStream(&def) { |
| return // no related node definition found |
| } |
| nodeDef := &xspb.CrossReferences_NodeDefinition_{&xspb.CrossReferences_NodeDefinition{ |
| Node: target, |
| Location: def, |
| }} |
| |
| var src *spb.VName |
| for srcStream(&src) { |
| emit(&xspb.CrossReferences{Source: src, Entry: nodeDef}) |
| } |
| } |
| |
| func bareRevEdge(eg *gspb.Edges, emit func(*scpb.Edge)) error { |
| switch e := eg.Entry.(type) { |
| case *gspb.Edges_Edge_: |
| edge := e.Edge |
| emit(&scpb.Edge{Target: eg.Source, Source: edge.Target}) |
| } |
| return nil |
| } |
| |
| func constructCaller(caller *spb.VName, defStream func(**srvpb.ExpandedAnchor) bool, msStream func(**cpb.MarkedSource) bool, calleeStream func(**spb.VName) bool, emit func(*xspb.CrossReferences)) { |
| var def *srvpb.ExpandedAnchor |
| if !defStream(&def) { |
| return // no caller definition found |
| } |
| var ms *cpb.MarkedSource |
| for msStream(&ms) { |
| break |
| } |
| |
| var callee *spb.VName |
| for calleeStream(&callee) { |
| emit(&xspb.CrossReferences{ |
| Source: callee, |
| Entry: &xspb.CrossReferences_Caller_{&xspb.CrossReferences_Caller{ |
| Caller: caller, |
| Location: def, |
| MarkedSource: ms, |
| }}, |
| }) |
| } |
| } |
| |
| func refToCallsite(r *ppb.Reference, emit func(*xspb.CrossReferences)) { |
| if r.GetKytheKind() != scpb.EdgeKind_REF_CALL || r.Scope == nil { |
| return |
| } |
| emit(&xspb.CrossReferences{ |
| Source: r.Source, |
| Entry: &xspb.CrossReferences_Callsite_{&xspb.CrossReferences_Callsite{ |
| Kind: xspb.CrossReferences_Callsite_DIRECT, |
| Caller: r.Scope, |
| Location: r.Anchor, |
| }}, |
| }) |
| } |
| |
| func callEdge(x *xspb.CrossReferences) *scpb.Edge { |
| return &scpb.Edge{Source: x.GetCallsite().GetCaller(), Target: x.GetSource()} |
| } |
| |
| func edgeToCrossRefRelation(eg *gspb.Edges, emit func(*xspb.CrossReferences)) error { |
| switch e := eg.Entry.(type) { |
| case *gspb.Edges_Edge_: |
| edge := e.Edge |
| r := &xspb.CrossReferences_Relation{ |
| Ordinal: edge.Ordinal, |
| Reverse: edge.Reverse, |
| Node: edge.Target, |
| } |
| if k := edge.GetGenericKind(); k != "" { |
| r.Kind = &xspb.CrossReferences_Relation_GenericKind{k} |
| } else { |
| r.Kind = &xspb.CrossReferences_Relation_KytheKind{edge.GetKytheKind()} |
| } |
| emit(&xspb.CrossReferences{ |
| Source: eg.Source, |
| Entry: &xspb.CrossReferences_Relation_{r}, |
| }) |
| return nil |
| case *gspb.Edges_Target_: |
| target := e.Target |
| emit(&xspb.CrossReferences{ |
| Source: eg.Source, |
| Entry: &xspb.CrossReferences_RelatedNode_{&xspb.CrossReferences_RelatedNode{ |
| Node: target.Node, |
| }}, |
| }) |
| return nil |
| default: |
| return fmt.Errorf("unexpected Edges entry: %T", e) |
| } |
| } |
| |
| // CrossReferences returns a Kythe file decorations table derived from the Kythe |
| // input graph. The beam.PCollections have elements of type |
| // KV<string, *srvpb.PagedCrossReferences> and |
| // KV<string, *srvpb.PagedCrossReferences_Page>, respectively. |
| func (k *KytheBeam) CrossReferences() (sets, pages beam.PCollection) { |
| s := k.s.Scope("CrossReferences") |
| refs := beam.CoGroupByKey(s, |
| beam.ParDo(s, keyRef, k.References()), |
| beam.ParDo(s, keyCrossRef, k.callGraph()), |
| ) |
| // TODO(schroederc): related nodes |
| // TODO(schroederc): MarkedSource |
| // TODO(schroederc): source_node |
| return beam.ParDo2(s, groupCrossRefs, refs) |
| } |
| |
| var callerKinds = map[xspb.CrossReferences_Callsite_Kind]string{ |
| xspb.CrossReferences_Callsite_DIRECT: "#internal/ref/call/direct", |
| xspb.CrossReferences_Callsite_OVERRIDE: "#internal/ref/call/override", |
| } |
| |
| // groupCrossRefs emits *srvpb.PagedCrossReferences and *srvpb.PagedCrossReferences_Pages for a |
| // single node's collection of *ppb.References and callsites. |
| func groupCrossRefs( |
| key *spb.VName, |
| refStream func(**ppb.Reference) bool, |
| callStream func(**xspb.CrossReferences) bool, |
| emitSet func(string, *srvpb.PagedCrossReferences), |
| emitPage func(string, *srvpb.PagedCrossReferences_Page)) { |
| set := &srvpb.PagedCrossReferences{SourceTicket: kytheuri.ToString(key)} |
| // TODO(schroederc): add paging |
| |
| // kind -> build_config -> group |
| groups := make(map[string]map[string]*srvpb.PagedCrossReferences_Group) |
| |
| var ref *ppb.Reference |
| for refStream(&ref) { |
| kind := refKind(ref) |
| configs, ok := groups[kind] |
| if !ok { |
| configs = make(map[string]*srvpb.PagedCrossReferences_Group) |
| groups[kind] = configs |
| } |
| config := ref.Anchor.BuildConfiguration |
| g, ok := configs[config] |
| if !ok { |
| g = &srvpb.PagedCrossReferences_Group{Kind: kind, BuildConfig: config} |
| configs[config] = g |
| set.Group = append(set.Group, g) |
| } |
| g.Anchor = append(g.Anchor, ref.Anchor) |
| } |
| |
| callers := make(map[string]*xspb.CrossReferences_Caller) |
| callsites := make(map[string][]*xspb.CrossReferences_Callsite) |
| var call *xspb.CrossReferences |
| for callStream(&call) { |
| switch e := call.Entry.(type) { |
| case *xspb.CrossReferences_Caller_: |
| callers[kytheuri.ToString(e.Caller.Caller)] = e.Caller |
| case *xspb.CrossReferences_Callsite_: |
| ticket := kytheuri.ToString(e.Callsite.Caller) |
| callsites[ticket] = append(callsites[ticket], e.Callsite) |
| } |
| } |
| for ticket, caller := range callers { |
| for _, site := range callsites[ticket] { |
| kind := callerKinds[site.Kind] |
| configs, ok := groups[kind] |
| if !ok { |
| configs = make(map[string]*srvpb.PagedCrossReferences_Group) |
| groups[kind] = configs |
| } |
| config := site.Location.BuildConfiguration |
| g, ok := configs[config] |
| if !ok { |
| g = &srvpb.PagedCrossReferences_Group{ |
| Kind: kind, |
| BuildConfig: config, |
| } |
| configs[config] = g |
| set.Group = append(set.Group, g) |
| } |
| |
| var groupCaller *srvpb.PagedCrossReferences_Caller |
| for _, c := range g.Caller { |
| if c.SemanticCaller == ticket { |
| groupCaller = c |
| break |
| } |
| } |
| if groupCaller == nil { |
| groupCaller = &srvpb.PagedCrossReferences_Caller{ |
| Caller: caller.Location, |
| SemanticCaller: ticket, |
| MarkedSource: caller.MarkedSource, |
| } |
| g.Caller = append(g.Caller, groupCaller) |
| } |
| groupCaller.Callsite = append(groupCaller.Callsite, site.Location) |
| } |
| } |
| |
| sort.Slice(set.Group, func(i, j int) bool { |
| return compare.Strings(set.Group[i].BuildConfig, set.Group[j].BuildConfig). |
| AndThen(set.Group[i].Kind, set.Group[j].Kind) == compare.LT |
| }) |
| for _, g := range set.Group { |
| sort.Slice(g.Anchor, func(i, j int) bool { return g.Anchor[i].Ticket < g.Anchor[j].Ticket }) |
| for _, caller := range g.Caller { |
| sort.Slice(caller.Callsite, func(i, j int) bool { return caller.Callsite[i].Ticket < caller.Callsite[j].Ticket }) |
| } |
| } |
| |
| emitSet("xrefs:"+set.SourceTicket, set) |
| } |
| |
| func keyRef(r *ppb.Reference) (*spb.VName, *ppb.Reference) { |
| return r.Source, &ppb.Reference{ |
| Kind: r.Kind, |
| Anchor: r.Anchor, |
| } |
| } |
| |
| func keyCrossRef(xr *xspb.CrossReferences) (*spb.VName, *xspb.CrossReferences) { |
| return xr.Source, &xspb.CrossReferences{Entry: xr.Entry} |
| } |
| |
| func (k *KytheBeam) decorationPieces(s beam.Scope) beam.PCollection { |
| decor := beam.ParDo(s, refToDecorPiece, k.References()) |
| |
| targets := beam.ParDo(s, targetToFile, decor) |
| bareNodes := beam.ParDo(s, &nodes.Filter{IncludeEdges: []string{}}, k.nodes) |
| |
| files := beam.ParDo(s, fileToDecorPiece, k.getFiles()) |
| targetNodes := beam.ParDo(s, nodeToDecorPiece, |
| beam.CoGroupByKey(s, beam.ParDo(s, moveSourceToKey, bareNodes), targets)) |
| defs := beam.ParDo(s, defToDecorPiece, |
| beam.CoGroupByKey(s, k.directDefinitions(), targets)) |
| // TODO(schroederc): overrides |
| decorDiagnostics := k.diagnostics() |
| |
| return beam.Flatten(s, decor, files, targetNodes, defs, decorDiagnostics) |
| } |
| |
| func (k *KytheBeam) diagnostics() beam.PCollection { |
| s := k.s.Scope("Diagnostics") |
| diagnostics := beam.Seq(s, k.Nodes(), &nodes.Filter{ |
| FilterByKind: []string{kinds.Diagnostic}, |
| IncludeFacts: []string{facts.Message, facts.Details, facts.ContextURL}, |
| }, nodeToDiagnostic) |
| refTags := beam.ParDo(s, refToTag, k.References()) |
| fileTags := beam.Seq(s, k.Nodes(), &nodes.Filter{ |
| FilterByKind: []string{kinds.File}, |
| IncludeFacts: []string{}, |
| IncludeEdges: []string{edges.Tagged}, |
| }, fileToTags) |
| return beam.ParDo(s, diagToDecor, beam.CoGroupByKey(s, diagnostics, refTags, fileTags)) |
| } |
| |
| func fileToTags(n *scpb.Node, emit func(*spb.VName, *spb.VName)) { |
| for _, e := range n.Edge { |
| emit(e.Target, n.Source) |
| } |
| } |
| |
| func diagToDecor(src *spb.VName, diagStream func(**cpb.Diagnostic) bool, refTagStream func(**srvpb.ExpandedAnchor) bool, fileTagStream func(**spb.VName) bool, emit func(*spb.VName, *ppb.DecorationPiece)) error { |
| var d *cpb.Diagnostic |
| if !diagStream(&d) { |
| return nil |
| } |
| |
| var ref *srvpb.ExpandedAnchor |
| for refTagStream(&ref) { |
| uri, err := kytheuri.Parse(ref.Ticket) |
| if err != nil { |
| return err |
| } |
| file := &spb.VName{ |
| Corpus: uri.Corpus, |
| Root: uri.Root, |
| Path: uri.Path, |
| } |
| diagWithSpan := *d |
| diagWithSpan.Span = ref.Span |
| emit(file, &ppb.DecorationPiece{ |
| Piece: &ppb.DecorationPiece_Diagnostic{ |
| Diagnostic: &diagWithSpan, |
| }, |
| }) |
| } |
| |
| var file *spb.VName |
| for fileTagStream(&file) { |
| emit(file, &ppb.DecorationPiece{ |
| Piece: &ppb.DecorationPiece_Diagnostic{Diagnostic: d}, |
| }) |
| } |
| |
| return nil |
| } |
| |
| func refToTag(r *ppb.Reference, emit func(*spb.VName, *srvpb.ExpandedAnchor)) { |
| if r.GetKytheKind() != scpb.EdgeKind_TAGGED { |
| return |
| } |
| emit(r.Source, r.Anchor) |
| } |
| |
| func nodeToDiagnostic(n *scpb.Node) (*spb.VName, *cpb.Diagnostic) { |
| d := &cpb.Diagnostic{} |
| for _, f := range n.Fact { |
| switch f.GetKytheName() { |
| case scpb.FactName_MESSAGE: |
| d.Message = string(f.Value) |
| case scpb.FactName_DETAILS: |
| d.Details = string(f.Value) |
| case scpb.FactName_CONTEXT_URL: |
| d.ContextUrl = string(f.Value) |
| } |
| } |
| return n.Source, d |
| } |
| |
| // SplitDecorations returns a columnar Kythe file decorations table derived from |
| // the Kythe input graph. The beam.PCollection has elements of type |
| // KV<[]byte, []byte>. |
| func (k *KytheBeam) SplitDecorations() beam.PCollection { |
| s := k.s.Scope("SplitDecorations") |
| return beam.ParDo(s, encodeDecorPiece, k.decorationPieces(s)) |
| } |
| |
| // Decorations returns a Kythe file decorations table derived from the Kythe |
| // input graph. The beam.PCollection has elements of type |
| // KV<string, *srvpb.FileDecorations>. |
| func (k *KytheBeam) Decorations() beam.PCollection { |
| s := k.s.Scope("Decorations") |
| pieces := k.decorationPieces(s) |
| return beam.ParDo(s, &ticketKey{"decor:"}, beam.CombinePerKey(s, &combineDecorPieces{}, pieces)) |
| } |
| |
| type ticketKey struct{ Prefix string } |
| |
| func (t *ticketKey) ProcessElement(key *spb.VName, val beam.T) (string, beam.T) { |
| return t.Prefix + kytheuri.ToString(key), val |
| } |
| |
| func targetToFile(file *spb.VName, p *ppb.DecorationPiece) (*spb.VName, *spb.VName, error) { |
| return p.GetReference().Source, file, nil |
| } |
| |
| // combineDecorPieces combines *ppb.DecorationPieces into a single *srvpb.FileDecorations. |
| type combineDecorPieces struct{} |
| |
| func (c *combineDecorPieces) CreateAccumulator() *srvpb.FileDecorations { |
| return &srvpb.FileDecorations{} |
| } |
| |
| func (c *combineDecorPieces) MergeAccumulators(accum, n *srvpb.FileDecorations) *srvpb.FileDecorations { |
| return accum |
| } |
| |
| func (c *combineDecorPieces) AddInput(accum *srvpb.FileDecorations, p *ppb.DecorationPiece) *srvpb.FileDecorations { |
| switch p := p.Piece.(type) { |
| case *ppb.DecorationPiece_Reference: |
| ref := p.Reference |
| accum.Decoration = append(accum.Decoration, &srvpb.FileDecorations_Decoration{ |
| Anchor: &srvpb.RawAnchor{ |
| StartOffset: ref.Anchor.Span.Start.ByteOffset, |
| EndOffset: ref.Anchor.Span.End.ByteOffset, |
| |
| BuildConfiguration: ref.Anchor.BuildConfiguration, |
| }, |
| Kind: refKind(ref), |
| Target: kytheuri.ToString(ref.Source), |
| }) |
| case *ppb.DecorationPiece_File: |
| accum.File = p.File |
| case *ppb.DecorationPiece_Node: |
| accum.Target = append(accum.Target, convertPipelineNode(p.Node)) |
| case *ppb.DecorationPiece_Definition_: |
| // TODO(schroederc): redesign *srvpb.FileDecorations to not need invasive |
| // changes to add a node's definition |
| def := p.Definition |
| accum.TargetDefinitions = append(accum.TargetDefinitions, def.Definition) |
| // Add a marker to associate the definition and node. ExtractOutput will |
| // later embed the definition within accum.Target/accum.TargetOverride. |
| accum.Target = append(accum.Target, &srvpb.Node{ |
| Ticket: kytheuri.ToString(def.Node), |
| DefinitionLocation: &srvpb.ExpandedAnchor{Ticket: def.Definition.Ticket}, |
| }) |
| case *ppb.DecorationPiece_Diagnostic: |
| accum.Diagnostic = append(accum.Diagnostic, p.Diagnostic) |
| default: |
| panic(fmt.Errorf("unhandled DecorationPiece: %T", p)) |
| } |
| return accum |
| } |
| |
| func convertPipelineNode(node *scpb.Node) *srvpb.Node { |
| n := &srvpb.Node{Ticket: kytheuri.ToString(node.Source)} |
| if kind := schema.GetNodeKind(node); kind != "" { |
| n.Fact = append(n.Fact, &cpb.Fact{ |
| Name: facts.NodeKind, |
| Value: []byte(kind), |
| }) |
| } |
| if subkind := schema.GetSubkind(node); subkind != "" { |
| n.Fact = append(n.Fact, &cpb.Fact{ |
| Name: facts.Subkind, |
| Value: []byte(subkind), |
| }) |
| } |
| for _, f := range node.Fact { |
| n.Fact = append(n.Fact, &cpb.Fact{ |
| Name: schema.GetFactName(f), |
| Value: f.Value, |
| }) |
| } |
| sort.Slice(n.Fact, func(i, j int) bool { return n.Fact[i].Name < n.Fact[j].Name }) |
| return n |
| } |
| |
| func (c *combineDecorPieces) ExtractOutput(fd *srvpb.FileDecorations) *srvpb.FileDecorations { |
| // Embed definitions for Decorations and Overrides |
| for i := len(fd.Target) - 1; i >= 0; i-- { |
| if fd.Target[i].DefinitionLocation == nil { |
| continue |
| } |
| node, def := fd.Target[i].Ticket, fd.Target[i].DefinitionLocation.Ticket |
| fd.Target = append(fd.Target[:i], fd.Target[i+1:]...) |
| |
| for _, d := range fd.Decoration { |
| if d.Target == node { |
| d.TargetDefinition = def |
| } |
| } |
| for _, o := range fd.TargetOverride { |
| if o.Overridden == node { |
| o.OverriddenDefinition = def |
| } |
| } |
| } |
| |
| sort.Slice(fd.Decoration, func(i, j int) bool { |
| if c := compare.Ints(int(fd.Decoration[i].Anchor.StartOffset), int(fd.Decoration[j].Anchor.StartOffset)); c != compare.EQ { |
| return c == compare.LT |
| } else if c := compare.Ints(int(fd.Decoration[i].Anchor.EndOffset), int(fd.Decoration[j].Anchor.EndOffset)); c != compare.EQ { |
| return c == compare.LT |
| } else if c := compare.Strings(fd.Decoration[i].Kind, fd.Decoration[j].Kind); c != compare.EQ { |
| return c == compare.LT |
| } |
| return fd.Decoration[i].Target < fd.Decoration[j].Target |
| }) |
| sort.Slice(fd.Target, func(i, j int) bool { return fd.Target[i].Ticket < fd.Target[j].Ticket }) |
| |
| sort.Slice(fd.Diagnostic, func(i, j int) bool { |
| a, b := fd.Diagnostic[i], fd.Diagnostic[j] |
| return compare.Compare(a.Span.GetStart().GetByteOffset(), b.Span.GetStart().GetByteOffset()). |
| AndThen(a.Span.GetEnd().GetByteOffset(), b.Span.GetEnd().GetByteOffset()). |
| AndThen(a.Message, b.Message) == compare.LT |
| }) |
| return fd |
| } |
| |
| func fileToDecorPiece(src *spb.VName, f *srvpb.File) (*spb.VName, *ppb.DecorationPiece) { |
| return src, &ppb.DecorationPiece{Piece: &ppb.DecorationPiece_File{f}} |
| } |
| |
| func refToDecorPiece(r *ppb.Reference, emit func(*spb.VName, *ppb.DecorationPiece)) error { |
| if r.GetKytheKind() == scpb.EdgeKind_TAGGED { |
| return nil |
| } |
| p := &ppb.DecorationPiece{ |
| Piece: &ppb.DecorationPiece_Reference{&ppb.Reference{ |
| Source: r.Source, |
| Kind: r.Kind, |
| Anchor: r.Anchor, |
| }}, |
| } |
| file, err := anchorToFileVName(r.Anchor.Ticket) |
| if err != nil { |
| return err |
| } |
| emit(file, p) |
| return nil |
| } |
| |
| func anchorToFileVName(anchorTicket string) (*spb.VName, error) { |
| anchor, err := kytheuri.ToVName(anchorTicket) |
| if err != nil { |
| return nil, err |
| } |
| return fileVName(anchor), nil |
| } |
| |
| func fileVName(anchor *spb.VName) *spb.VName { |
| return &spb.VName{ |
| Corpus: anchor.Corpus, |
| Root: anchor.Root, |
| Path: anchor.Path, |
| } |
| } |
| |
| func nodeToDecorPiece(key *spb.VName, node func(**scpb.Node) bool, file func(**spb.VName) bool, emit func(*spb.VName, *ppb.DecorationPiece)) { |
| var n, singleNode *scpb.Node |
| for node(&n) { |
| singleNode = n |
| } |
| if singleNode == nil { |
| return |
| } |
| |
| piece := &ppb.DecorationPiece{ |
| Piece: &ppb.DecorationPiece_Node{&scpb.Node{ |
| Source: key, |
| Kind: singleNode.Kind, |
| Subkind: singleNode.Subkind, |
| Fact: singleNode.Fact, |
| Edge: singleNode.Edge, |
| }}, |
| } |
| |
| var f *spb.VName |
| for file(&f) { |
| emit(f, piece) |
| } |
| } |
| |
| func defToDecorPiece(node *spb.VName, defs func(**srvpb.ExpandedAnchor) bool, file func(**spb.VName) bool, emit func(*spb.VName, *ppb.DecorationPiece)) { |
| var def *srvpb.ExpandedAnchor |
| for defs(&def) { |
| // TODO(schroederc): select ambiguous definition better |
| break // pick first known definition |
| } |
| if def == nil { |
| return |
| } |
| piece := &ppb.DecorationPiece{ |
| Piece: &ppb.DecorationPiece_Definition_{&ppb.DecorationPiece_Definition{ |
| Node: node, |
| Definition: def, |
| }}, |
| } |
| var f *spb.VName |
| for file(&f) { |
| emit(f, piece) |
| } |
| } |
| |
| // Nodes returns all *scpb.Nodes from the Kythe input graph. |
| func (k *KytheBeam) Nodes() beam.PCollection { return k.nodes } |
| |
| // References returns all derived *ppb.References from the Kythe input graph. |
| func (k *KytheBeam) References() beam.PCollection { |
| if k.refs.IsValid() { |
| return k.refs |
| } |
| s := k.s.Scope("References") |
| anchors := beam.ParDo(s, keyByPath, beam.ParDo(s, |
| &nodes.Filter{ |
| FilterByKind: []string{kinds.Anchor}, |
| IncludeFacts: []string{ |
| facts.AnchorStart, facts.AnchorEnd, |
| facts.SnippetStart, facts.SnippetEnd, |
| facts.BuildConfig, |
| }, |
| }, k.nodes)) |
| k.refs = beam.ParDo(s, toRefs, beam.CoGroupByKey(s, k.getFiles(), anchors)) |
| return k.refs |
| } |
| |
| func (k *KytheBeam) getFiles() beam.PCollection { |
| if !k.files.IsValid() { |
| fileNodes := beam.ParDo(k.s, |
| &nodes.Filter{ |
| FilterByKind: []string{kinds.File}, |
| IncludeFacts: []string{facts.Text, facts.TextEncoding}, |
| }, k.nodes) |
| k.files = beam.ParDo(k.s, toFiles, fileNodes) |
| } |
| return k.files |
| } |
| |
| func keyByPath(n *scpb.Node) (*spb.VName, *scpb.Node) { |
| return &spb.VName{Corpus: n.Source.Corpus, Root: n.Source.Root, Path: n.Source.Path}, n |
| } |
| |
| func toRefs(p *spb.VName, file func(**srvpb.File) bool, anchor func(**scpb.Node) bool, emit func(*ppb.Reference)) error { |
| var f *srvpb.File |
| if !file(&f) { |
| return nil |
| } |
| return normalizeAnchors(f, anchor, emit) |
| } |
| |
| func toFiles(n *scpb.Node) (*spb.VName, *srvpb.File) { |
| var f srvpb.File |
| for _, fact := range n.Fact { |
| switch fact.GetKytheName() { |
| case scpb.FactName_TEXT: |
| f.Text = fact.Value |
| case scpb.FactName_TEXT_ENCODING: |
| f.Encoding = string(fact.Value) |
| } |
| } |
| return n.Source, &f |
| } |
| |
| func normalizeAnchors(file *srvpb.File, anchor func(**scpb.Node) bool, emit func(*ppb.Reference)) error { |
| norm := span.NewNormalizer(file.Text) |
| var n *scpb.Node |
| for anchor(&n) { |
| raw, err := toRawAnchor(n) |
| if err != nil { |
| return err |
| } |
| a, err := assemble.ExpandAnchor(raw, file, norm, "") |
| if err != nil { |
| log.Printf("error expanding anchor {%+v}: %v", raw, err) |
| break |
| } |
| |
| var parent *spb.VName |
| for _, e := range n.Edge { |
| if e.GetKytheKind() == scpb.EdgeKind_CHILD_OF { |
| // There should only be a single parent for each anchor. |
| parent = e.Target |
| break |
| } |
| } |
| |
| for _, e := range n.Edge { |
| if e.GetKytheKind() == scpb.EdgeKind_CHILD_OF { |
| continue |
| } |
| ref := &ppb.Reference{ |
| Source: e.Target, |
| Anchor: a, |
| Scope: parent, |
| } |
| if k := e.GetKytheKind(); k == scpb.EdgeKind_UNKNOWN_EDGE_KIND { |
| ref.Kind = &ppb.Reference_GenericKind{e.GetGenericKind()} |
| } else { |
| ref.Kind = &ppb.Reference_KytheKind{k} |
| } |
| emit(ref) |
| } |
| } |
| return nil |
| } |
| |
| func toRawAnchor(n *scpb.Node) (*srvpb.RawAnchor, error) { |
| var a srvpb.RawAnchor |
| for _, f := range n.Fact { |
| var err error |
| switch f.GetKytheName() { |
| case scpb.FactName_BUILD_CONFIG: |
| a.BuildConfiguration = string(f.Value) |
| case scpb.FactName_LOC_START: |
| a.StartOffset, err = factValueToInt(f) |
| case scpb.FactName_LOC_END: |
| a.EndOffset, err = factValueToInt(f) |
| case scpb.FactName_SNIPPET_START: |
| a.SnippetStart, err = factValueToInt(f) |
| case scpb.FactName_SNIPPET_END: |
| a.SnippetEnd, err = factValueToInt(f) |
| default: |
| return nil, fmt.Errorf("unhandled fact: %v", f) |
| } |
| if err != nil { |
| return nil, err |
| } |
| } |
| a.Ticket = kytheuri.ToString(n.Source) |
| return &a, nil |
| } |
| |
| func factValueToInt(f *scpb.Fact) (int32, error) { |
| i, err := strconv.Atoi(string(f.Value)) |
| if err != nil { |
| return 0, fmt.Errorf("invalid integer fact value for %q: %v", schema.GetFactName(f), err) |
| } |
| return int32(i), nil |
| } |
| |
| func moveSourceToKey(n *scpb.Node) (*spb.VName, *scpb.Node) { |
| return n.Source, &scpb.Node{ |
| Kind: n.Kind, |
| Subkind: n.Subkind, |
| Fact: n.Fact, |
| Edge: n.Edge, |
| } |
| } |
| |
| func (k *KytheBeam) directDefinitions() beam.PCollection { |
| s := k.s.Scope("DirectDefinitions") |
| return beam.ParDo(s, toDefinition, k.References()) |
| } |
| |
| func toDefinition(r *ppb.Reference, emit func(*spb.VName, *srvpb.ExpandedAnchor)) error { |
| if edges.IsVariant(refKind(r), edges.Defines) { |
| emit(r.Source, r.Anchor) |
| } |
| return nil |
| } |
| |
| func refKind(r *ppb.Reference) string { |
| if k := r.GetKytheKind(); k != scpb.EdgeKind_UNKNOWN_EDGE_KIND { |
| return schema.EdgeKindString(k) |
| } |
| return r.GetGenericKind() |
| } |
| |
| // Edges returns a Kythe edges table derived from the Kythe input graph. The beam.PCollections have |
| // elements of type KV<string, *srvpb.PagedEdgeSet> and KV<string, *srvpb.EdgePage>, respectively. |
| func (k *KytheBeam) Edges() (beam.PCollection, beam.PCollection) { |
| s := k.s.Scope("Edges") |
| |
| nodes := beam.ParDo(s, moveSourceToKey, k.nodes) |
| edges := beam.ParDo(s, reverseEdge, beam.CoGroupByKey(s, nodes, beam.ParDo(s, nodeToEdges, k.nodes))) |
| rev := beam.ParDo(s, nodeToReverseEdges, k.nodes) |
| |
| return beam.ParDo2(s, groupEdges, beam.CoGroupByKey(s, nodes, edges, rev)) |
| } |
| |
| // edgeRelations returns a beam.PCollection of gspb.Edges for all Kythe graph |
| // relations. |
| func (k *KytheBeam) edgeRelations() beam.PCollection { |
| if !k.edges.IsValid() { |
| s := k.s.Scope("Relations") |
| |
| nodeEdges := beam.Seq(s, k.nodes, filterAnchorNodes, &nodes.Filter{IncludeFacts: []string{}}) |
| sourceNodes := beam.ParDo(s, moveSourceToKey, k.nodes) |
| |
| targetNodes := beam.ParDo(s, encodeEdgeTarget, beam.CoGroupByKey(s, |
| sourceNodes, |
| beam.ParDo(s, splitEdge, filter.Distinct(s, beam.ParDo(s, edgeTargets, nodeEdges))))) |
| edges := beam.ParDo(s, encodeEdges, nodeEdges) |
| |
| k.edges = beam.Flatten(s, edges, targetNodes) |
| } |
| return k.edges |
| } |
| |
| // SplitEdges returns a columnar Kythe edges table derived from the Kythe input |
| // graph. The beam.PCollection have elements of type KV<[]byte, []byte>. |
| func (k *KytheBeam) SplitEdges() beam.PCollection { |
| s := k.s.Scope("SplitEdges") |
| |
| idx := beam.ParDo(s, combineEdgesIndex, |
| // TODO(schroederc): counts; also needed for presence with only rev edges |
| beam.ParDo(s, keyNode, beam.ParDo(s, &nodes.Filter{IncludeEdges: []string{}}, k.Nodes()))) |
| |
| return beam.ParDo(s, encodeEdgesEntry, beam.Flatten(s, idx, k.edgeRelations())) |
| } |
| |
| func filterAnchorNodes(n *scpb.Node, emit func(*scpb.Node)) { |
| if n.GetKytheKind() == scpb.NodeKind_ANCHOR { |
| return |
| } |
| emit(n) |
| } |
| |
| func edgeTargets(n *scpb.Node, emit func(*scpb.Edge)) { |
| for _, e := range n.Edge { |
| emit(&scpb.Edge{Source: n.Source, Target: e.Target}) |
| emit(&scpb.Edge{Target: n.Source, Source: e.Target}) |
| } |
| } |
| |
| func splitEdge(e *scpb.Edge) (*spb.VName, *spb.VName) { return e.Source, e.Target } |
| |
| func combineEdgesIndex(src *spb.VName, node *scpb.Node) *gspb.Edges { |
| return &gspb.Edges{ |
| Source: src, |
| Entry: &gspb.Edges_Index_{&gspb.Edges_Index{ |
| Node: node, |
| }}, |
| } |
| } |
| |
| // nodeToReverseEdges emits an *scpb.Edge with its SourceNode populated for each of n's edges. The |
| // key for each *scpb.Edge is its Target VName. |
| func nodeToReverseEdges(n *scpb.Node, emit func(*spb.VName, *scpb.Edge)) { |
| node := nodeWithoutEdges(n) |
| for _, e := range n.Edge { |
| emit(e.Target, &scpb.Edge{ |
| SourceNode: node, |
| Target: e.Target, |
| Kind: e.Kind, |
| Ordinal: e.Ordinal, |
| }) |
| } |
| } |
| |
| // nodeToEdges emits an *scpb.Edge for each of n's edges. The key for each *scpb.Edge is its Target |
| // VName. |
| func nodeToEdges(n *scpb.Node, emit func(*spb.VName, *scpb.Edge)) { |
| for _, e := range n.Edge { |
| emit(e.Target, &scpb.Edge{ |
| Source: n.Source, |
| Target: e.Target, |
| Kind: e.Kind, |
| Ordinal: e.Ordinal, |
| }) |
| } |
| } |
| |
| func nodeWithoutEdges(n *scpb.Node) *scpb.Node { |
| return &scpb.Node{ |
| Source: n.Source, |
| Kind: n.Kind, |
| Subkind: n.Subkind, |
| Fact: n.Fact, |
| } |
| } |
| |
| // reverseEdge emits the reverse of each *scpb.Edge, embedding the associated TargetNode. |
| func reverseEdge(src *spb.VName, nodeStream func(**scpb.Node) bool, edgeStream func(**scpb.Edge) bool, emit func(*spb.VName, *scpb.Edge)) { |
| var node *scpb.Node |
| if !nodeStream(&node) { |
| node = &scpb.Node{} |
| } else { |
| node = nodeWithoutEdges(node) |
| } |
| node.Source = src |
| |
| var e *scpb.Edge |
| for edgeStream(&e) { |
| emit(e.Source, &scpb.Edge{ |
| Source: e.Source, |
| TargetNode: node, |
| Kind: e.Kind, |
| Ordinal: e.Ordinal, |
| }) |
| } |
| } |
| |
| // groupEdges emits *srvpb.PagedEdgeSets and *srvpb.EdgePages for a node and its forward/reverse |
| // edges. |
| func groupEdges(src *spb.VName, nodeStream func(**scpb.Node) bool, edgeStream, revStream func(**scpb.Edge) bool, emitSet func(string, *srvpb.PagedEdgeSet), emitPage func(string, *srvpb.EdgePage)) { |
| set := &srvpb.PagedEdgeSet{} |
| // TODO(schroederc): paging |
| |
| var node *scpb.Node |
| if nodeStream(&node) { |
| node.Source = src |
| set.Source = convertPipelineNode(node) |
| } else { |
| set.Source = &srvpb.Node{Ticket: kytheuri.ToString(src)} |
| } |
| |
| groups := make(map[string]*srvpb.EdgeGroup) |
| |
| var edge *scpb.Edge |
| for edgeStream(&edge) { |
| kind := schema.GetEdgeKind(edge) |
| g, ok := groups[kind] |
| if !ok { |
| g = &srvpb.EdgeGroup{Kind: kind} |
| groups[kind] = g |
| set.Group = append(set.Group, g) |
| } |
| g.Edge = append(g.Edge, &srvpb.EdgeGroup_Edge{ |
| Target: convertPipelineNode(edge.TargetNode), |
| Ordinal: edge.Ordinal, |
| }) |
| } |
| for revStream(&edge) { |
| kind := "%" + schema.GetEdgeKind(edge) // encode reverse edge kind |
| g, ok := groups[kind] |
| if !ok { |
| g = &srvpb.EdgeGroup{Kind: kind} |
| groups[kind] = g |
| set.Group = append(set.Group, g) |
| } |
| g.Edge = append(g.Edge, &srvpb.EdgeGroup_Edge{ |
| Target: convertPipelineNode(edge.SourceNode), |
| Ordinal: edge.Ordinal, |
| }) |
| } |
| |
| sort.Slice(set.Group, func(i, j int) bool { return set.Group[i].Kind < set.Group[j].Kind }) |
| for _, g := range set.Group { |
| sort.Slice(g.Edge, func(i, j int) bool { |
| return compare.Compare(g.Edge[i].Ordinal, g.Edge[j].Ordinal). |
| AndThen(g.Edge[i].Target.Ticket, g.Edge[j].Target.Ticket) == compare.LT |
| }) |
| } |
| |
| emitSet("edgeSets:"+set.Source.Ticket, set) |
| } |
| |
| func (k *KytheBeam) getMarkedSources() beam.PCollection { |
| if !k.markedSources.IsValid() { |
| s := k.s.Scope("MarkedSources") |
| k.markedSources = beam.Seq(s, k.nodes, &nodes.Filter{ |
| IncludeFacts: []string{facts.Code}, |
| IncludeEdges: []string{}, |
| }, parseMarkedSource) |
| } |
| return k.markedSources |
| } |
| |
| // Documents returns a Kythe documentation table derived from the Kythe input |
| // graph. The beam.PCollection has elements of type KV<string, |
| // *srvpb.Document>. |
| func (k *KytheBeam) Documents() beam.PCollection { |
| s := k.s.Scope("Documents") |
| |
| docs := beam.Seq(s, k.nodes, &nodes.Filter{ |
| FilterByKind: []string{kinds.Doc}, |
| IncludeFacts: []string{facts.Text}, |
| IncludeEdges: []string{edges.Documents}, |
| }, nodeToDocs) |
| markedSources := k.getMarkedSources() |
| children := beam.Seq(s, k.nodes, &nodes.Filter{ |
| IncludeFacts: []string{}, |
| IncludeEdges: []string{edges.ChildOf}, |
| }, nodeToChildren) |
| |
| return beam.ParDo(s, completeDocument, beam.CoGroupByKey(s, docs, markedSources, children)) |
| } |
| |
| // completeDocument emits a single *srvpb.Document per *spb.VName source. |
| func completeDocument(key *spb.VName, docStream func(**srvpb.Document) bool, msStream func(**cpb.MarkedSource) bool, childStream func(**spb.VName) bool, emit func(string, *srvpb.Document)) { |
| var doc *srvpb.Document |
| if !docStream(&doc) { |
| return |
| } |
| doc.Ticket = kytheuri.ToString(key) |
| |
| msStream(&doc.MarkedSource) // embed MarkedSource, if available |
| |
| var child *spb.VName |
| for childStream(&child) { |
| doc.ChildTicket = append(doc.ChildTicket, kytheuri.ToString(child)) |
| } |
| sort.Strings(doc.ChildTicket) |
| |
| // TODO(schroederc): add definition Links |
| emit("docs:"+doc.Ticket, doc) |
| } |
| |
| // nodeToDocs emits a (*spb.VName, *srvpb.Document) pair for each |
| // /kythe/edge/documents edges from the given `doc` *scpb.Node. |
| func nodeToDocs(n *scpb.Node, emit func(*spb.VName, *srvpb.Document)) { |
| d := &srvpb.Document{} |
| for _, f := range n.Fact { |
| if f.GetKytheName() == scpb.FactName_TEXT { |
| d.RawText = string(f.Value) |
| break |
| } |
| } |
| |
| for _, e := range n.Edge { |
| if e.GetKytheKind() == scpb.EdgeKind_DOCUMENTS { |
| emit(e.Target, d) |
| } |
| } |
| } |
| |
| // parseMarkedSource parses the /kythe/code fact for each *scpb.Node. |
| func parseMarkedSource(n *scpb.Node, emit func(*spb.VName, *cpb.MarkedSource)) error { |
| for _, f := range n.Fact { |
| if f.GetKytheName() == scpb.FactName_CODE { |
| var ms cpb.MarkedSource |
| if err := proto.Unmarshal(f.Value, &ms); err != nil { |
| return err |
| } |
| emit(n.Source, &ms) |
| break |
| } |
| } |
| return nil |
| } |
| |
| // nodeToChildren emits a (parent, child) pair for each /kythe/edge/childof edge |
| // per *scpb.Node. |
| func nodeToChildren(n *scpb.Node, emit func(*spb.VName, *spb.VName)) { |
| for _, e := range n.Edge { |
| if e.GetKytheKind() == scpb.EdgeKind_CHILD_OF { |
| emit(e.Target, n.Source) // parent -> child |
| } |
| } |
| } |