blob: 62b5235c9774e9e81d497e035b97530f77a2e7d0 [file] [log] [blame]
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for V2 summary ops from summary_ops_v2."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import unittest
import six
from tensorflow.core.framework import graph_pb2
from tensorflow.core.framework import node_def_pb2
from tensorflow.core.framework import step_stats_pb2
from tensorflow.core.framework import summary_pb2
from tensorflow.core.protobuf import config_pb2
from tensorflow.core.util import event_pb2
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import errors
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_spec
from tensorflow.python.framework import tensor_util
from tensorflow.python.framework import test_util
from tensorflow.python.lib.io import tf_record
from tensorflow.python.module import module
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import summary_ops_v2 as summary_ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import gfile
from tensorflow.python.platform import test
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.saved_model import load as saved_model_load
from tensorflow.python.saved_model import loader as saved_model_loader
from tensorflow.python.saved_model import save as saved_model_save
from tensorflow.python.saved_model import tag_constants
class SummaryOpsCoreTest(test_util.TensorFlowTestCase):
def testWrite(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
output = summary_ops.write('tag', 42, step=12)
self.assertTrue(output.numpy())
events = events_from_logdir(logdir)
self.assertEqual(2, len(events))
self.assertEqual(12, events[1].step)
value = events[1].summary.value[0]
self.assertEqual('tag', value.tag)
self.assertEqual(42, to_numpy(value))
def testWrite_fromFunction(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function
def f():
with writer.as_default():
return summary_ops.write('tag', 42, step=12)
output = f()
self.assertTrue(output.numpy())
events = events_from_logdir(logdir)
self.assertEqual(2, len(events))
self.assertEqual(12, events[1].step)
value = events[1].summary.value[0]
self.assertEqual('tag', value.tag)
self.assertEqual(42, to_numpy(value))
def testWrite_metadata(self):
logdir = self.get_temp_dir()
metadata = summary_pb2.SummaryMetadata()
metadata.plugin_data.plugin_name = 'foo'
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.write('obj', 0, 0, metadata=metadata)
summary_ops.write('bytes', 0, 0, metadata=metadata.SerializeToString())
m = constant_op.constant(metadata.SerializeToString())
summary_ops.write('string_tensor', 0, 0, metadata=m)
events = events_from_logdir(logdir)
self.assertEqual(4, len(events))
self.assertEqual(metadata, events[1].summary.value[0].metadata)
self.assertEqual(metadata, events[2].summary.value[0].metadata)
self.assertEqual(metadata, events[3].summary.value[0].metadata)
def testWrite_name(self):
@def_function.function
def f():
output = summary_ops.write('tag', 42, step=12, name='anonymous')
self.assertTrue(output.name.startswith('anonymous'))
f()
def testWrite_ndarray(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.write('tag', [[1, 2], [3, 4]], step=12)
events = events_from_logdir(logdir)
value = events[1].summary.value[0]
self.assertAllEqual([[1, 2], [3, 4]], to_numpy(value))
def testWrite_tensor(self):
logdir = self.get_temp_dir()
with context.eager_mode():
t = constant_op.constant([[1, 2], [3, 4]])
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.write('tag', t, step=12)
expected = t.numpy()
events = events_from_logdir(logdir)
value = events[1].summary.value[0]
self.assertAllEqual(expected, to_numpy(value))
def testWrite_tensor_fromFunction(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function
def f(t):
with writer.as_default():
summary_ops.write('tag', t, step=12)
t = constant_op.constant([[1, 2], [3, 4]])
f(t)
expected = t.numpy()
events = events_from_logdir(logdir)
value = events[1].summary.value[0]
self.assertAllEqual(expected, to_numpy(value))
def testWrite_stringTensor(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.write('tag', [b'foo', b'bar'], step=12)
events = events_from_logdir(logdir)
value = events[1].summary.value[0]
self.assertAllEqual([b'foo', b'bar'], to_numpy(value))
@test_util.run_gpu_only
def testWrite_gpuDeviceContext(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
with ops.device('/GPU:0'):
value = constant_op.constant(42.0)
step = constant_op.constant(12, dtype=dtypes.int64)
summary_ops.write('tag', value, step=step).numpy()
empty_metadata = summary_pb2.SummaryMetadata()
events = events_from_logdir(logdir)
self.assertEqual(2, len(events))
self.assertEqual(12, events[1].step)
self.assertEqual(42, to_numpy(events[1].summary.value[0]))
self.assertEqual(empty_metadata, events[1].summary.value[0].metadata)
@test_util.also_run_as_tf_function
def testWrite_noDefaultWriter(self):
# Use assertAllEqual instead of assertFalse since it works in a defun.
self.assertAllEqual(False, summary_ops.write('tag', 42, step=0))
@test_util.also_run_as_tf_function
def testWrite_noStep_okayIfAlsoNoDefaultWriter(self):
# Use assertAllEqual instead of assertFalse since it works in a defun.
self.assertAllEqual(False, summary_ops.write('tag', 42))
def testWrite_noStep(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
with self.assertRaisesRegex(ValueError, 'No step set'):
summary_ops.write('tag', 42)
def testWrite_noStep_okayIfNotRecordingSummaries(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
with summary_ops.record_if(False):
self.assertFalse(summary_ops.write('tag', 42))
def testWrite_usingDefaultStep(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.set_step(1)
summary_ops.write('tag', 1.0)
summary_ops.set_step(2)
summary_ops.write('tag', 1.0)
mystep = variables.Variable(10, dtype=dtypes.int64)
summary_ops.set_step(mystep)
summary_ops.write('tag', 1.0)
mystep.assign_add(1)
summary_ops.write('tag', 1.0)
events = events_from_logdir(logdir)
self.assertEqual(5, len(events))
self.assertEqual(1, events[1].step)
self.assertEqual(2, events[2].step)
self.assertEqual(10, events[3].step)
self.assertEqual(11, events[4].step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStepConstant_fromFunction(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function
def f():
with writer.as_default():
summary_ops.write('tag', 1.0)
summary_ops.set_step(1)
f()
summary_ops.set_step(2)
f()
events = events_from_logdir(logdir)
self.assertEqual(3, len(events))
self.assertEqual(1, events[1].step)
# The step value will still be 1 because the value was captured at the
# time the function was first traced.
self.assertEqual(1, events[2].step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStepVariable_fromFunction(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function
def f():
with writer.as_default():
summary_ops.write('tag', 1.0)
mystep = variables.Variable(0, dtype=dtypes.int64)
summary_ops.set_step(mystep)
f()
mystep.assign_add(1)
f()
mystep.assign(10)
f()
events = events_from_logdir(logdir)
self.assertEqual(4, len(events))
self.assertEqual(0, events[1].step)
self.assertEqual(1, events[2].step)
self.assertEqual(10, events[3].step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStepConstant_fromLegacyGraph(self):
logdir = self.get_temp_dir()
try:
with context.graph_mode():
writer = summary_ops.create_file_writer_v2(logdir)
summary_ops.set_step(1)
with writer.as_default():
write_op = summary_ops.write('tag', 1.0)
summary_ops.set_step(2)
with self.cached_session() as sess:
sess.run(writer.init())
sess.run(write_op)
sess.run(write_op)
sess.run(writer.flush())
events = events_from_logdir(logdir)
self.assertEqual(3, len(events))
self.assertEqual(1, events[1].step)
# The step value will still be 1 because the value was captured at the
# time the graph was constructed.
self.assertEqual(1, events[2].step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStepVariable_fromLegacyGraph(self):
logdir = self.get_temp_dir()
try:
with context.graph_mode():
writer = summary_ops.create_file_writer_v2(logdir)
mystep = variables.Variable(0, dtype=dtypes.int64)
summary_ops.set_step(mystep)
with writer.as_default():
write_op = summary_ops.write('tag', 1.0)
first_assign_op = mystep.assign_add(1)
second_assign_op = mystep.assign(10)
with self.cached_session() as sess:
sess.run(writer.init())
sess.run(mystep.initializer)
sess.run(write_op)
sess.run(first_assign_op)
sess.run(write_op)
sess.run(second_assign_op)
sess.run(write_op)
sess.run(writer.flush())
events = events_from_logdir(logdir)
self.assertEqual(4, len(events))
self.assertEqual(0, events[1].step)
self.assertEqual(1, events[2].step)
self.assertEqual(10, events[3].step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStep_fromAsDefault(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
with writer.as_default(step=1):
summary_ops.write('tag', 1.0)
with writer.as_default():
summary_ops.write('tag', 1.0)
with writer.as_default(step=2):
summary_ops.write('tag', 1.0)
summary_ops.write('tag', 1.0)
summary_ops.set_step(3)
summary_ops.write('tag', 1.0)
events = events_from_logdir(logdir)
self.assertListEqual([1, 1, 2, 1, 3], [e.step for e in events[1:]])
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStepVariable_fromAsDefault(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
mystep = variables.Variable(1, dtype=dtypes.int64)
with writer.as_default(step=mystep):
summary_ops.write('tag', 1.0)
with writer.as_default():
mystep.assign(2)
summary_ops.write('tag', 1.0)
with writer.as_default(step=3):
summary_ops.write('tag', 1.0)
summary_ops.write('tag', 1.0)
mystep.assign(4)
summary_ops.write('tag', 1.0)
events = events_from_logdir(logdir)
self.assertListEqual([1, 2, 3, 2, 4], [e.step for e in events[1:]])
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStep_fromSetAsDefault(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
mystep = variables.Variable(1, dtype=dtypes.int64)
writer.set_as_default(step=mystep)
summary_ops.write('tag', 1.0)
mystep.assign(2)
summary_ops.write('tag', 1.0)
writer.set_as_default(step=3)
summary_ops.write('tag', 1.0)
writer.flush()
events = events_from_logdir(logdir)
self.assertListEqual([1, 2, 3], [e.step for e in events[1:]])
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_usingDefaultStepVariable_fromSetAsDefault(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
writer.set_as_default(step=1)
summary_ops.write('tag', 1.0)
writer.set_as_default(step=2)
summary_ops.write('tag', 1.0)
writer.set_as_default()
summary_ops.write('tag', 1.0)
writer.flush()
events = events_from_logdir(logdir)
self.assertListEqual([1, 2, 2], [e.step for e in events[1:]])
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testWrite_recordIf_constant(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
self.assertTrue(summary_ops.write('default', 1, step=0))
with summary_ops.record_if(True):
self.assertTrue(summary_ops.write('set_on', 1, step=0))
with summary_ops.record_if(False):
self.assertFalse(summary_ops.write('set_off', 1, step=0))
events = events_from_logdir(logdir)
self.assertEqual(3, len(events))
self.assertEqual('default', events[1].summary.value[0].tag)
self.assertEqual('set_on', events[2].summary.value[0].tag)
def testWrite_recordIf_constant_fromFunction(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function
def f():
with writer.as_default():
# Use assertAllEqual instead of assertTrue since it works in a defun.
self.assertAllEqual(summary_ops.write('default', 1, step=0), True)
with summary_ops.record_if(True):
self.assertAllEqual(summary_ops.write('set_on', 1, step=0), True)
with summary_ops.record_if(False):
self.assertAllEqual(summary_ops.write('set_off', 1, step=0), False)
f()
events = events_from_logdir(logdir)
self.assertEqual(3, len(events))
self.assertEqual('default', events[1].summary.value[0].tag)
self.assertEqual('set_on', events[2].summary.value[0].tag)
def testWrite_recordIf_callable(self):
logdir = self.get_temp_dir()
with context.eager_mode():
step = variables.Variable(-1, dtype=dtypes.int64)
def record_fn():
step.assign_add(1)
return int(step % 2) == 0
with summary_ops.create_file_writer_v2(logdir).as_default():
with summary_ops.record_if(record_fn):
self.assertTrue(summary_ops.write('tag', 1, step=step))
self.assertFalse(summary_ops.write('tag', 1, step=step))
self.assertTrue(summary_ops.write('tag', 1, step=step))
self.assertFalse(summary_ops.write('tag', 1, step=step))
self.assertTrue(summary_ops.write('tag', 1, step=step))
events = events_from_logdir(logdir)
self.assertEqual(4, len(events))
self.assertEqual(0, events[1].step)
self.assertEqual(2, events[2].step)
self.assertEqual(4, events[3].step)
def testWrite_recordIf_callable_fromFunction(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
step = variables.Variable(-1, dtype=dtypes.int64)
@def_function.function
def record_fn():
step.assign_add(1)
return math_ops.equal(step % 2, 0)
@def_function.function
def f():
with writer.as_default():
with summary_ops.record_if(record_fn):
return [
summary_ops.write('tag', 1, step=step),
summary_ops.write('tag', 1, step=step),
summary_ops.write('tag', 1, step=step)]
self.assertAllEqual(f(), [True, False, True])
self.assertAllEqual(f(), [False, True, False])
events = events_from_logdir(logdir)
self.assertEqual(4, len(events))
self.assertEqual(0, events[1].step)
self.assertEqual(2, events[2].step)
self.assertEqual(4, events[3].step)
def testWrite_recordIf_tensorInput_fromFunction(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[], dtype=dtypes.int64)])
def f(step):
with writer.as_default():
with summary_ops.record_if(math_ops.equal(step % 2, 0)):
return summary_ops.write('tag', 1, step=step)
self.assertTrue(f(0))
self.assertFalse(f(1))
self.assertTrue(f(2))
self.assertFalse(f(3))
self.assertTrue(f(4))
events = events_from_logdir(logdir)
self.assertEqual(4, len(events))
self.assertEqual(0, events[1].step)
self.assertEqual(2, events[2].step)
self.assertEqual(4, events[3].step)
def testWriteRawPb(self):
logdir = self.get_temp_dir()
pb = summary_pb2.Summary()
pb.value.add().simple_value = 42.0
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
output = summary_ops.write_raw_pb(pb.SerializeToString(), step=12)
self.assertTrue(output.numpy())
events = events_from_logdir(logdir)
self.assertEqual(2, len(events))
self.assertEqual(12, events[1].step)
self.assertProtoEquals(pb, events[1].summary)
def testWriteRawPb_fromFunction(self):
logdir = self.get_temp_dir()
pb = summary_pb2.Summary()
pb.value.add().simple_value = 42.0
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
@def_function.function
def f():
with writer.as_default():
return summary_ops.write_raw_pb(pb.SerializeToString(), step=12)
output = f()
self.assertTrue(output.numpy())
events = events_from_logdir(logdir)
self.assertEqual(2, len(events))
self.assertEqual(12, events[1].step)
self.assertProtoEquals(pb, events[1].summary)
def testWriteRawPb_multipleValues(self):
logdir = self.get_temp_dir()
pb1 = summary_pb2.Summary()
pb1.value.add().simple_value = 1.0
pb1.value.add().simple_value = 2.0
pb2 = summary_pb2.Summary()
pb2.value.add().simple_value = 3.0
pb3 = summary_pb2.Summary()
pb3.value.add().simple_value = 4.0
pb3.value.add().simple_value = 5.0
pb3.value.add().simple_value = 6.0
pbs = [pb.SerializeToString() for pb in (pb1, pb2, pb3)]
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
output = summary_ops.write_raw_pb(pbs, step=12)
self.assertTrue(output.numpy())
events = events_from_logdir(logdir)
self.assertEqual(2, len(events))
self.assertEqual(12, events[1].step)
expected_pb = summary_pb2.Summary()
for i in range(6):
expected_pb.value.add().simple_value = i + 1.0
self.assertProtoEquals(expected_pb, events[1].summary)
def testWriteRawPb_invalidValue(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
with self.assertRaisesRegex(
errors.DataLossError,
'Bad tf.compat.v1.Summary binary proto tensor string'):
summary_ops.write_raw_pb('notaproto', step=12)
@test_util.also_run_as_tf_function
def testGetSetStep(self):
try:
self.assertIsNone(summary_ops.get_step())
summary_ops.set_step(1)
# Use assertAllEqual instead of assertEqual since it works in a defun.
self.assertAllEqual(1, summary_ops.get_step())
summary_ops.set_step(constant_op.constant(2))
self.assertAllEqual(2, summary_ops.get_step())
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testGetSetStep_variable(self):
with context.eager_mode():
try:
mystep = variables.Variable(0)
summary_ops.set_step(mystep)
self.assertAllEqual(0, summary_ops.get_step().read_value())
mystep.assign_add(1)
self.assertAllEqual(1, summary_ops.get_step().read_value())
# Check that set_step() properly maintains reference to variable.
del mystep
self.assertAllEqual(1, summary_ops.get_step().read_value())
summary_ops.get_step().assign_add(1)
self.assertAllEqual(2, summary_ops.get_step().read_value())
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
def testGetSetStep_variable_fromFunction(self):
with context.eager_mode():
try:
@def_function.function
def set_step(step):
summary_ops.set_step(step)
return summary_ops.get_step()
@def_function.function
def get_and_increment():
summary_ops.get_step().assign_add(1)
return summary_ops.get_step()
mystep = variables.Variable(0)
self.assertAllEqual(0, set_step(mystep))
self.assertAllEqual(0, summary_ops.get_step().read_value())
self.assertAllEqual(1, get_and_increment())
self.assertAllEqual(2, get_and_increment())
# Check that set_step() properly maintains reference to variable.
del mystep
self.assertAllEqual(3, get_and_increment())
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
@test_util.also_run_as_tf_function
def testSummaryScope(self):
with summary_ops.summary_scope('foo') as (tag, scope):
self.assertEqual('foo', tag)
self.assertEqual('foo/', scope)
with summary_ops.summary_scope('bar') as (tag, scope):
self.assertEqual('foo/bar', tag)
self.assertEqual('foo/bar/', scope)
with summary_ops.summary_scope('with/slash') as (tag, scope):
self.assertEqual('foo/with/slash', tag)
self.assertEqual('foo/with/slash/', scope)
with ops.name_scope(None, skip_on_eager=False):
with summary_ops.summary_scope('unnested') as (tag, scope):
self.assertEqual('unnested', tag)
self.assertEqual('unnested/', scope)
@test_util.also_run_as_tf_function
def testSummaryScope_defaultName(self):
with summary_ops.summary_scope(None) as (tag, scope):
self.assertEqual('summary', tag)
self.assertEqual('summary/', scope)
with summary_ops.summary_scope(None, 'backup') as (tag, scope):
self.assertEqual('backup', tag)
self.assertEqual('backup/', scope)
@test_util.also_run_as_tf_function
def testSummaryScope_handlesCharactersIllegalForScope(self):
with summary_ops.summary_scope('f?o?o') as (tag, scope):
self.assertEqual('f?o?o', tag)
self.assertEqual('foo/', scope)
# If all characters aren't legal for a scope name, use default name.
with summary_ops.summary_scope('???', 'backup') as (tag, scope):
self.assertEqual('???', tag)
self.assertEqual('backup/', scope)
@test_util.also_run_as_tf_function
def testSummaryScope_nameNotUniquifiedForTag(self):
constant_op.constant(0, name='foo')
with summary_ops.summary_scope('foo') as (tag, _):
self.assertEqual('foo', tag)
with summary_ops.summary_scope('foo') as (tag, _):
self.assertEqual('foo', tag)
with ops.name_scope('with', skip_on_eager=False):
constant_op.constant(0, name='slash')
with summary_ops.summary_scope('with/slash') as (tag, _):
self.assertEqual('with/slash', tag)
def testAllV2SummaryOps(self):
logdir = self.get_temp_dir()
def define_ops():
result = []
# TF 2.0 summary ops
result.append(summary_ops.write('write', 1, step=0))
result.append(summary_ops.write_raw_pb(b'', step=0, name='raw_pb'))
# TF 1.x tf.contrib.summary ops
result.append(summary_ops.generic('tensor', 1, step=1))
result.append(summary_ops.scalar('scalar', 2.0, step=1))
result.append(summary_ops.histogram('histogram', [1.0], step=1))
result.append(summary_ops.image('image', [[[[1.0]]]], step=1))
result.append(summary_ops.audio('audio', [[1.0]], 1.0, 1, step=1))
return result
with context.graph_mode():
ops_without_writer = define_ops()
with summary_ops.create_file_writer_v2(logdir).as_default():
with summary_ops.record_if(True):
ops_recording_on = define_ops()
with summary_ops.record_if(False):
ops_recording_off = define_ops()
# We should be collecting all ops defined with a default writer present,
# regardless of whether recording was set on or off, but not those defined
# without a writer at all.
del ops_without_writer
expected_ops = ops_recording_on + ops_recording_off
self.assertCountEqual(expected_ops, summary_ops.all_v2_summary_ops())
class SummaryWriterTest(test_util.TensorFlowTestCase):
def testCreate_withInitAndClose(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(
logdir, max_queue=1000, flush_millis=1000000)
get_total = lambda: len(events_from_logdir(logdir))
self.assertEqual(1, get_total()) # file_version Event
# Calling init() again while writer is open has no effect
writer.init()
self.assertEqual(1, get_total())
with writer.as_default():
summary_ops.write('tag', 1, step=0)
self.assertEqual(1, get_total())
# Calling .close() should do an implicit flush
writer.close()
self.assertEqual(2, get_total())
def testCreate_fromFunction(self):
logdir = self.get_temp_dir()
@def_function.function
def f():
# Returned SummaryWriter must be stored in a non-local variable so it
# lives throughout the function execution.
if not hasattr(f, 'writer'):
f.writer = summary_ops.create_file_writer_v2(logdir)
with context.eager_mode():
f()
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(1, len(event_files))
def testCreate_graphTensorArgument_raisesError(self):
logdir = self.get_temp_dir()
with context.graph_mode():
logdir_tensor = constant_op.constant(logdir)
with context.eager_mode():
with self.assertRaisesRegex(
ValueError, 'Invalid graph Tensor argument.*logdir'):
summary_ops.create_file_writer_v2(logdir_tensor)
self.assertEmpty(gfile.Glob(os.path.join(logdir, '*')))
def testCreate_fromFunction_graphTensorArgument_raisesError(self):
logdir = self.get_temp_dir()
@def_function.function
def f():
summary_ops.create_file_writer_v2(constant_op.constant(logdir))
with context.eager_mode():
with self.assertRaisesRegex(
ValueError, 'Invalid graph Tensor argument.*logdir'):
f()
self.assertEmpty(gfile.Glob(os.path.join(logdir, '*')))
def testCreate_fromFunction_unpersistedResource_raisesError(self):
logdir = self.get_temp_dir()
@def_function.function
def f():
with summary_ops.create_file_writer_v2(logdir).as_default():
pass # Calling .as_default() is enough to indicate use.
with context.eager_mode():
# TODO(nickfelt): change this to a better error
with self.assertRaisesRegex(
errors.NotFoundError, 'Resource.*does not exist'):
f()
# Even though we didn't use it, an event file will have been created.
self.assertEqual(1, len(gfile.Glob(os.path.join(logdir, '*'))))
def testCreate_immediateSetAsDefault_retainsReference(self):
logdir = self.get_temp_dir()
try:
with context.eager_mode():
summary_ops.create_file_writer_v2(logdir).set_as_default()
summary_ops.flush()
finally:
# Ensure we clean up no matter how the test executes.
summary_ops._summary_state.writer = None # pylint: disable=protected-access
def testCreate_immediateAsDefault_retainsReference(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.flush()
def testCreate_avoidsFilenameCollision(self):
logdir = self.get_temp_dir()
with context.eager_mode():
for _ in range(10):
summary_ops.create_file_writer_v2(logdir)
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertLen(event_files, 10)
def testCreate_graphMode_avoidsFilenameCollision(self):
logdir = self.get_temp_dir()
with context.graph_mode(), ops.Graph().as_default():
writer = summary_ops.create_file_writer_v2(logdir)
with self.cached_session() as sess:
for _ in range(10):
sess.run(writer.init())
sess.run(writer.close())
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertLen(event_files, 10)
def testNoSharing(self):
# Two writers with the same logdir should not share state.
logdir = self.get_temp_dir()
with context.eager_mode():
writer1 = summary_ops.create_file_writer_v2(logdir)
with writer1.as_default():
summary_ops.write('tag', 1, step=1)
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(1, len(event_files))
file1 = event_files[0]
writer2 = summary_ops.create_file_writer_v2(logdir)
with writer2.as_default():
summary_ops.write('tag', 1, step=2)
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(2, len(event_files))
event_files.remove(file1)
file2 = event_files[0]
# Extra writes to ensure interleaved usage works.
with writer1.as_default():
summary_ops.write('tag', 1, step=1)
with writer2.as_default():
summary_ops.write('tag', 1, step=2)
events = iter(events_from_file(file1))
self.assertEqual('brain.Event:2', next(events).file_version)
self.assertEqual(1, next(events).step)
self.assertEqual(1, next(events).step)
self.assertRaises(StopIteration, lambda: next(events))
events = iter(events_from_file(file2))
self.assertEqual('brain.Event:2', next(events).file_version)
self.assertEqual(2, next(events).step)
self.assertEqual(2, next(events).step)
self.assertRaises(StopIteration, lambda: next(events))
def testNoSharing_fromFunction(self):
logdir = self.get_temp_dir()
@def_function.function
def f1():
if not hasattr(f1, 'writer'):
f1.writer = summary_ops.create_file_writer_v2(logdir)
with f1.writer.as_default():
summary_ops.write('tag', 1, step=1)
@def_function.function
def f2():
if not hasattr(f2, 'writer'):
f2.writer = summary_ops.create_file_writer_v2(logdir)
with f2.writer.as_default():
summary_ops.write('tag', 1, step=2)
with context.eager_mode():
f1()
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(1, len(event_files))
file1 = event_files[0]
f2()
event_files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(2, len(event_files))
event_files.remove(file1)
file2 = event_files[0]
# Extra writes to ensure interleaved usage works.
f1()
f2()
events = iter(events_from_file(file1))
self.assertEqual('brain.Event:2', next(events).file_version)
self.assertEqual(1, next(events).step)
self.assertEqual(1, next(events).step)
self.assertRaises(StopIteration, lambda: next(events))
events = iter(events_from_file(file2))
self.assertEqual('brain.Event:2', next(events).file_version)
self.assertEqual(2, next(events).step)
self.assertEqual(2, next(events).step)
self.assertRaises(StopIteration, lambda: next(events))
def testMaxQueue(self):
logdir = self.get_temp_dir()
with context.eager_mode():
with summary_ops.create_file_writer_v2(
logdir, max_queue=1, flush_millis=999999).as_default():
get_total = lambda: len(events_from_logdir(logdir))
# Note: First tf.compat.v1.Event is always file_version.
self.assertEqual(1, get_total())
summary_ops.write('tag', 1, step=0)
self.assertEqual(1, get_total())
# Should flush after second summary since max_queue = 1
summary_ops.write('tag', 1, step=0)
self.assertEqual(3, get_total())
def testWriterFlush(self):
logdir = self.get_temp_dir()
get_total = lambda: len(events_from_logdir(logdir))
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(
logdir, max_queue=1000, flush_millis=1000000)
self.assertEqual(1, get_total()) # file_version Event
with writer.as_default():
summary_ops.write('tag', 1, step=0)
self.assertEqual(1, get_total())
writer.flush()
self.assertEqual(2, get_total())
summary_ops.write('tag', 1, step=0)
self.assertEqual(2, get_total())
# Exiting the "as_default()" should do an implicit flush
self.assertEqual(3, get_total())
def testFlushFunction(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(
logdir, max_queue=999999, flush_millis=999999)
with writer.as_default():
get_total = lambda: len(events_from_logdir(logdir))
# Note: First tf.compat.v1.Event is always file_version.
self.assertEqual(1, get_total())
summary_ops.write('tag', 1, step=0)
summary_ops.write('tag', 1, step=0)
self.assertEqual(1, get_total())
summary_ops.flush()
self.assertEqual(3, get_total())
# Test "writer" parameter
summary_ops.write('tag', 1, step=0)
self.assertEqual(3, get_total())
summary_ops.flush(writer=writer)
self.assertEqual(4, get_total())
summary_ops.write('tag', 1, step=0)
self.assertEqual(4, get_total())
summary_ops.flush(writer=writer._resource) # pylint:disable=protected-access
self.assertEqual(5, get_total())
@test_util.assert_no_new_tensors
def testNoMemoryLeak_graphMode(self):
logdir = self.get_temp_dir()
with context.graph_mode(), ops.Graph().as_default():
summary_ops.create_file_writer_v2(logdir)
@test_util.assert_no_new_pyobjects_executing_eagerly
def testNoMemoryLeak_eagerMode(self):
logdir = self.get_temp_dir()
with summary_ops.create_file_writer_v2(logdir).as_default():
summary_ops.write('tag', 1, step=0)
def testClose_preventsLaterUse(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
writer.close()
writer.close() # redundant close() is a no-op
writer.flush() # redundant flush() is a no-op
with self.assertRaisesRegex(RuntimeError, 'already closed'):
writer.init()
with self.assertRaisesRegex(RuntimeError, 'already closed'):
with writer.as_default():
self.fail('should not get here')
with self.assertRaisesRegex(RuntimeError, 'already closed'):
writer.set_as_default()
def testClose_closesOpenFile(self):
try:
import psutil # pylint: disable=g-import-not-at-top
except ImportError:
raise unittest.SkipTest('test requires psutil')
proc = psutil.Process()
get_open_filenames = lambda: set(info[0] for info in proc.open_files())
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(1, len(files))
eventfile = files[0]
self.assertIn(eventfile, get_open_filenames())
writer.close()
self.assertNotIn(eventfile, get_open_filenames())
def testDereference_closesOpenFile(self):
try:
import psutil # pylint: disable=g-import-not-at-top
except ImportError:
raise unittest.SkipTest('test requires psutil')
proc = psutil.Process()
get_open_filenames = lambda: set(info[0] for info in proc.open_files())
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
files = gfile.Glob(os.path.join(logdir, '*'))
self.assertEqual(1, len(files))
eventfile = files[0]
self.assertIn(eventfile, get_open_filenames())
del writer
self.assertNotIn(eventfile, get_open_filenames())
class SummaryWriterSavedModelTest(test_util.TensorFlowTestCase):
def testWriter_savedAsModuleProperty_loadInEagerMode(self):
with context.eager_mode():
class Model(module.Module):
def __init__(self, model_dir):
self._writer = summary_ops.create_file_writer_v2(
model_dir, experimental_trackable=True)
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[], dtype=dtypes.int64)
])
def train(self, step):
with self._writer.as_default():
summary_ops.write('tag', 'foo', step=step)
return constant_op.constant(0)
logdir = self.get_temp_dir()
to_export = Model(logdir)
pre_save_files = set(events_from_multifile_logdir(logdir))
export_dir = os.path.join(logdir, 'export')
saved_model_save.save(
to_export, export_dir, signatures={'train': to_export.train})
# Reset context to ensure we don't share any resources with saving code.
context._reset_context() # pylint: disable=protected-access
with context.eager_mode():
restored = saved_model_load.load(export_dir)
restored.train(1)
restored.train(2)
post_restore_files = set(events_from_multifile_logdir(logdir))
restored2 = saved_model_load.load(export_dir)
restored2.train(3)
restored2.train(4)
files_to_events = events_from_multifile_logdir(logdir)
post_restore2_files = set(files_to_events)
self.assertLen(files_to_events, 3)
def unwrap_singleton(iterable):
self.assertLen(iterable, 1)
return next(iter(iterable))
restore_file = unwrap_singleton(post_restore_files - pre_save_files)
restore2_file = unwrap_singleton(post_restore2_files - post_restore_files)
restore_events = files_to_events[restore_file]
restore2_events = files_to_events[restore2_file]
self.assertLen(restore_events, 3)
self.assertEqual(1, restore_events[1].step)
self.assertEqual(2, restore_events[2].step)
self.assertLen(restore2_events, 3)
self.assertEqual(3, restore2_events[1].step)
self.assertEqual(4, restore2_events[2].step)
def testWriter_savedAsModuleProperty_loadInGraphMode(self):
with context.eager_mode():
class Model(module.Module):
def __init__(self, model_dir):
self._writer = summary_ops.create_file_writer_v2(
model_dir, experimental_trackable=True)
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[], dtype=dtypes.int64)
])
def train(self, step):
with self._writer.as_default():
summary_ops.write('tag', 'foo', step=step)
return constant_op.constant(0)
logdir = self.get_temp_dir()
to_export = Model(logdir)
pre_save_files = set(events_from_multifile_logdir(logdir))
export_dir = os.path.join(logdir, 'export')
saved_model_save.save(
to_export, export_dir, signatures={'train': to_export.train})
# Reset context to ensure we don't share any resources with saving code.
context._reset_context() # pylint: disable=protected-access
def load_and_run_model(sess, input_values):
"""Load and run the SavedModel signature in the TF 1.x style."""
model = saved_model_loader.load(sess, [tag_constants.SERVING], export_dir)
signature = model.signature_def['train']
inputs = list(signature.inputs.values())
assert len(inputs) == 1, inputs
outputs = list(signature.outputs.values())
assert len(outputs) == 1, outputs
input_tensor = sess.graph.get_tensor_by_name(inputs[0].name)
output_tensor = sess.graph.get_tensor_by_name(outputs[0].name)
for v in input_values:
sess.run(output_tensor, feed_dict={input_tensor: v})
with context.graph_mode(), ops.Graph().as_default():
# Since writer shared_name is fixed, within a single session, all loads of
# this SavedModel will refer to a single writer resouce, so it will be
# initialized only once and write to a single file.
with self.session() as sess:
load_and_run_model(sess, [1, 2])
load_and_run_model(sess, [3, 4])
post_restore_files = set(events_from_multifile_logdir(logdir))
# New session will recreate the resource and write to a second file.
with self.session() as sess:
load_and_run_model(sess, [5, 6])
files_to_events = events_from_multifile_logdir(logdir)
post_restore2_files = set(files_to_events)
self.assertLen(files_to_events, 3)
def unwrap_singleton(iterable):
self.assertLen(iterable, 1)
return next(iter(iterable))
restore_file = unwrap_singleton(post_restore_files - pre_save_files)
restore2_file = unwrap_singleton(post_restore2_files - post_restore_files)
restore_events = files_to_events[restore_file]
restore2_events = files_to_events[restore2_file]
self.assertLen(restore_events, 5)
self.assertEqual(1, restore_events[1].step)
self.assertEqual(2, restore_events[2].step)
self.assertEqual(3, restore_events[3].step)
self.assertEqual(4, restore_events[4].step)
self.assertLen(restore2_events, 3)
self.assertEqual(5, restore2_events[1].step)
self.assertEqual(6, restore2_events[2].step)
class NoopWriterTest(test_util.TensorFlowTestCase):
def testNoopWriter_doesNothing(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_noop_writer()
writer.init()
with writer.as_default():
result = summary_ops.write('test', 1.0, step=0)
writer.flush()
writer.close()
self.assertFalse(result) # Should have found no active writer
files = gfile.Glob(os.path.join(logdir, '*'))
self.assertLen(files, 0)
def testNoopWriter_asNestedContext_isTransparent(self):
logdir = self.get_temp_dir()
with context.eager_mode():
writer = summary_ops.create_file_writer_v2(logdir)
noop_writer = summary_ops.create_noop_writer()
with writer.as_default():
result1 = summary_ops.write('first', 1.0, step=0)
with noop_writer.as_default():
result2 = summary_ops.write('second', 1.0, step=0)
result3 = summary_ops.write('third', 1.0, step=0)
# All ops should have written, including the one inside the no-op writer,
# since it doesn't actively *disable* writing - it just behaves as if that
# entire `with` block wasn't there at all.
self.assertAllEqual([result1, result2, result3], [True, True, True])
def testNoopWriter_setAsDefault(self):
try:
with context.eager_mode():
writer = summary_ops.create_noop_writer()
writer.set_as_default()
result = summary_ops.write('test', 1.0, step=0)
self.assertFalse(result) # Should have found no active writer
finally:
# Ensure we clean up no matter how the test executes.
summary_ops._summary_state.writer = None # pylint: disable=protected-access
class SummaryOpsTest(test_util.TensorFlowTestCase):
def tearDown(self):
summary_ops.trace_off()
def exec_summary_op(self, summary_op_fn):
assert context.executing_eagerly()
logdir = self.get_temp_dir()
writer = summary_ops.create_file_writer_v2(logdir)
with writer.as_default():
summary_op_fn()
writer.close()
events = events_from_logdir(logdir)
return events[1]
def run_metadata(self, *args, **kwargs):
assert context.executing_eagerly()
logdir = self.get_temp_dir()
writer = summary_ops.create_file_writer_v2(logdir)
with writer.as_default():
summary_ops.run_metadata(*args, **kwargs)
writer.close()
events = events_from_logdir(logdir)
return events[1]
def run_metadata_graphs(self, *args, **kwargs):
assert context.executing_eagerly()
logdir = self.get_temp_dir()
writer = summary_ops.create_file_writer_v2(logdir)
with writer.as_default():
summary_ops.run_metadata_graphs(*args, **kwargs)
writer.close()
events = events_from_logdir(logdir)
return events[1]
def create_run_metadata(self):
step_stats = step_stats_pb2.StepStats(dev_stats=[
step_stats_pb2.DeviceStepStats(
device='cpu:0',
node_stats=[step_stats_pb2.NodeExecStats(node_name='hello')])
])
return config_pb2.RunMetadata(
function_graphs=[
config_pb2.RunMetadata.FunctionGraphs(
pre_optimization_graph=graph_pb2.GraphDef(
node=[node_def_pb2.NodeDef(name='foo')]))
],
step_stats=step_stats)
def run_trace(self, f, step=1):
assert context.executing_eagerly()
logdir = self.get_temp_dir()
writer = summary_ops.create_file_writer_v2(logdir)
summary_ops.trace_on(graph=True, profiler=False)
with writer.as_default():
f()
summary_ops.trace_export(name='foo', step=step)
writer.close()
events = events_from_logdir(logdir)
return events[1]
@test_util.run_v2_only
def testRunMetadata_usesNameAsTag(self):
meta = config_pb2.RunMetadata()
with ops.name_scope('foo', skip_on_eager=False):
event = self.run_metadata(name='my_name', data=meta, step=1)
first_val = event.summary.value[0]
self.assertEqual('foo/my_name', first_val.tag)
@test_util.run_v2_only
def testRunMetadata_summaryMetadata(self):
expected_summary_metadata = """
plugin_data {
plugin_name: "graph_run_metadata"
content: "1"
}
"""
meta = config_pb2.RunMetadata()
event = self.run_metadata(name='my_name', data=meta, step=1)
actual_summary_metadata = event.summary.value[0].metadata
self.assertProtoEquals(expected_summary_metadata, actual_summary_metadata)
@test_util.run_v2_only
def testRunMetadata_wholeRunMetadata(self):
expected_run_metadata = """
step_stats {
dev_stats {
device: "cpu:0"
node_stats {
node_name: "hello"
}
}
}
function_graphs {
pre_optimization_graph {
node {
name: "foo"
}
}
}
"""
meta = self.create_run_metadata()
event = self.run_metadata(name='my_name', data=meta, step=1)
first_val = event.summary.value[0]
actual_run_metadata = config_pb2.RunMetadata.FromString(
first_val.tensor.string_val[0])
self.assertProtoEquals(expected_run_metadata, actual_run_metadata)
@test_util.run_v2_only
def testRunMetadata_usesDefaultStep(self):
meta = config_pb2.RunMetadata()
try:
summary_ops.set_step(42)
event = self.run_metadata(name='my_name', data=meta)
self.assertEqual(42, event.step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
@test_util.run_v2_only
def testRunMetadataGraph_usesNameAsTag(self):
meta = config_pb2.RunMetadata()
with ops.name_scope('foo', skip_on_eager=False):
event = self.run_metadata_graphs(name='my_name', data=meta, step=1)
first_val = event.summary.value[0]
self.assertEqual('foo/my_name', first_val.tag)
@test_util.run_v2_only
def testRunMetadataGraph_summaryMetadata(self):
expected_summary_metadata = """
plugin_data {
plugin_name: "graph_run_metadata_graph"
content: "1"
}
"""
meta = config_pb2.RunMetadata()
event = self.run_metadata_graphs(name='my_name', data=meta, step=1)
actual_summary_metadata = event.summary.value[0].metadata
self.assertProtoEquals(expected_summary_metadata, actual_summary_metadata)
@test_util.run_v2_only
def testRunMetadataGraph_runMetadataFragment(self):
expected_run_metadata = """
function_graphs {
pre_optimization_graph {
node {
name: "foo"
}
}
}
"""
meta = self.create_run_metadata()
event = self.run_metadata_graphs(name='my_name', data=meta, step=1)
first_val = event.summary.value[0]
actual_run_metadata = config_pb2.RunMetadata.FromString(
first_val.tensor.string_val[0])
self.assertProtoEquals(expected_run_metadata, actual_run_metadata)
@test_util.run_v2_only
def testRunMetadataGraph_usesDefaultStep(self):
meta = config_pb2.RunMetadata()
try:
summary_ops.set_step(42)
event = self.run_metadata_graphs(name='my_name', data=meta)
self.assertEqual(42, event.step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
@test_util.run_v2_only
def testTrace(self):
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
event = self.run_trace(f)
first_val = event.summary.value[0]
actual_run_metadata = config_pb2.RunMetadata.FromString(
first_val.tensor.string_val[0])
# Content of function_graphs is large and, for instance, device can change.
self.assertTrue(hasattr(actual_run_metadata, 'function_graphs'))
@test_util.run_v2_only
def testTrace_cannotEnableTraceInFunction(self):
@def_function.function
def f():
summary_ops.trace_on(graph=True, profiler=False)
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
with test.mock.patch.object(logging, 'warn') as mock_log:
f()
self.assertRegex(
str(mock_log.call_args), 'Cannot enable trace inside a tf.function.')
@test_util.run_v2_only
def testTrace_cannotEnableTraceInGraphMode(self):
with test.mock.patch.object(logging, 'warn') as mock_log:
with context.graph_mode():
summary_ops.trace_on(graph=True, profiler=False)
self.assertRegex(
str(mock_log.call_args), 'Must enable trace in eager mode.')
@test_util.run_v2_only
def testTrace_cannotExportTraceWithoutTrace(self):
with six.assertRaisesRegex(self, ValueError,
'Must enable trace before export.'):
summary_ops.trace_export(name='foo', step=1)
@test_util.run_v2_only
def testTrace_cannotExportTraceInFunction(self):
summary_ops.trace_on(graph=True, profiler=False)
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
summary_ops.trace_export(name='foo', step=1)
return x**y
with test.mock.patch.object(logging, 'warn') as mock_log:
f()
self.assertRegex(
str(mock_log.call_args), 'Cannot export trace inside a tf.function.')
@test_util.run_v2_only
def testTrace_cannotExportTraceInGraphMode(self):
with test.mock.patch.object(logging, 'warn') as mock_log:
with context.graph_mode():
summary_ops.trace_export(name='foo', step=1)
self.assertRegex(
str(mock_log.call_args),
'Can only export trace while executing eagerly.')
@test_util.run_v2_only
def testTrace_usesDefaultStep(self):
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
try:
summary_ops.set_step(42)
event = self.run_trace(f, step=None)
self.assertEqual(42, event.step)
finally:
# Reset to default state for other tests.
summary_ops.set_step(None)
@test_util.run_v2_only
def testTrace_withProfiler(self):
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
assert context.executing_eagerly()
logdir = self.get_temp_dir()
writer = summary_ops.create_file_writer_v2(logdir)
summary_ops.trace_on(graph=True, profiler=True)
profiler_outdir = self.get_temp_dir()
with writer.as_default():
f()
summary_ops.trace_export(
name='foo', step=1, profiler_outdir=profiler_outdir)
writer.close()
@test_util.run_v2_only
def testGraph_graph(self):
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
def summary_op_fn():
summary_ops.graph(f.get_concrete_function().graph)
event = self.exec_summary_op(summary_op_fn)
self.assertIsNotNone(event.graph_def)
@test_util.run_v2_only
def testGraph_graphDef(self):
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
def summary_op_fn():
summary_ops.graph(f.get_concrete_function().graph.as_graph_def())
event = self.exec_summary_op(summary_op_fn)
self.assertIsNotNone(event.graph_def)
@test_util.run_v2_only
def testGraph_invalidData(self):
def summary_op_fn():
summary_ops.graph('hello')
with self.assertRaisesRegex(
ValueError,
r'\'graph_data\' is not tf.Graph or tf.compat.v1.GraphDef',
):
self.exec_summary_op(summary_op_fn)
@test_util.run_v2_only
def testGraph_fromGraphMode(self):
@def_function.function
def f():
x = constant_op.constant(2)
y = constant_op.constant(3)
return x**y
@def_function.function
def g(graph):
summary_ops.graph(graph)
def summary_op_fn():
graph_def = f.get_concrete_function().graph.as_graph_def(add_shapes=True)
func_graph = constant_op.constant(graph_def.SerializeToString())
g(func_graph)
with self.assertRaisesRegex(
ValueError,
r'graph\(\) cannot be invoked inside a graph context.',
):
self.exec_summary_op(summary_op_fn)
def events_from_file(filepath):
"""Returns all events in a single event file.
Args:
filepath: Path to the event file.
Returns:
A list of all tf.Event protos in the event file.
"""
records = list(tf_record.tf_record_iterator(filepath))
result = []
for r in records:
event = event_pb2.Event()
event.ParseFromString(r)
result.append(event)
return result
def events_from_logdir(logdir):
"""Returns all events in the single eventfile in logdir.
Args:
logdir: The directory in which the single event file is sought.
Returns:
A list of all tf.Event protos from the single event file.
Raises:
AssertionError: If logdir does not contain exactly one file.
"""
assert gfile.Exists(logdir)
files = gfile.ListDirectory(logdir)
assert len(files) == 1, 'Found not exactly one file in logdir: %s' % files
return events_from_file(os.path.join(logdir, files[0]))
def events_from_multifile_logdir(logdir):
"""Returns map of filename to events for all `tfevents` files in the logdir.
Args:
logdir: The directory from which to load events.
Returns:
A dict mapping from relative filenames to lists of tf.Event protos.
Raises:
AssertionError: If logdir does not contain exactly one file.
"""
assert gfile.Exists(logdir)
files = [file for file in gfile.ListDirectory(logdir) if 'tfevents' in file]
return {file: events_from_file(os.path.join(logdir, file)) for file in files}
def to_numpy(summary_value):
return tensor_util.MakeNdarray(summary_value.tensor)
if __name__ == '__main__':
test.main()