| # Copyright 2017 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Test times.""" |
| |
| import collections |
| import logging |
| import threading |
| import time as _time |
| |
| import grpc |
| import grpc_testing |
| |
| _LOGGER = logging.getLogger(__name__) |
| |
| |
| def _call(behaviors): |
| for behavior in behaviors: |
| try: |
| behavior() |
| except Exception: # pylint: disable=broad-except |
| _LOGGER.exception('Exception calling behavior "%r"!', behavior) |
| |
| |
| def _call_in_thread(behaviors): |
| calling = threading.Thread(target=_call, args=(behaviors,)) |
| calling.start() |
| # NOTE(nathaniel): Because this function is called from "strict" Time |
| # implementations, it blocks until after all behaviors have terminated. |
| calling.join() |
| |
| |
| class _State(object): |
| |
| def __init__(self): |
| self.condition = threading.Condition() |
| self.times_to_behaviors = collections.defaultdict(list) |
| |
| |
| class _Delta( |
| collections.namedtuple('_Delta', ( |
| 'mature_behaviors', |
| 'earliest_mature_time', |
| 'earliest_immature_time', |
| ))): |
| pass |
| |
| |
| def _process(state, now): |
| mature_behaviors = [] |
| earliest_mature_time = None |
| while state.times_to_behaviors: |
| earliest_time = min(state.times_to_behaviors) |
| if earliest_time <= now: |
| if earliest_mature_time is None: |
| earliest_mature_time = earliest_time |
| earliest_mature_behaviors = state.times_to_behaviors.pop( |
| earliest_time) |
| mature_behaviors.extend(earliest_mature_behaviors) |
| else: |
| earliest_immature_time = earliest_time |
| break |
| else: |
| earliest_immature_time = None |
| return _Delta(mature_behaviors, earliest_mature_time, |
| earliest_immature_time) |
| |
| |
| class _Future(grpc.Future): |
| |
| def __init__(self, state, behavior, time): |
| self._state = state |
| self._behavior = behavior |
| self._time = time |
| self._cancelled = False |
| |
| def cancel(self): |
| with self._state.condition: |
| if self._cancelled: |
| return True |
| else: |
| behaviors_at_time = self._state.times_to_behaviors.get( |
| self._time) |
| if behaviors_at_time is None: |
| return False |
| else: |
| behaviors_at_time.remove(self._behavior) |
| if not behaviors_at_time: |
| self._state.times_to_behaviors.pop(self._time) |
| self._state.condition.notify_all() |
| self._cancelled = True |
| return True |
| |
| def cancelled(self): |
| with self._state.condition: |
| return self._cancelled |
| |
| def running(self): |
| raise NotImplementedError() |
| |
| def done(self): |
| raise NotImplementedError() |
| |
| def result(self, timeout=None): |
| raise NotImplementedError() |
| |
| def exception(self, timeout=None): |
| raise NotImplementedError() |
| |
| def traceback(self, timeout=None): |
| raise NotImplementedError() |
| |
| def add_done_callback(self, fn): |
| raise NotImplementedError() |
| |
| |
| class StrictRealTime(grpc_testing.Time): |
| |
| def __init__(self): |
| self._state = _State() |
| self._active = False |
| self._calling = None |
| |
| def _activity(self): |
| while True: |
| with self._state.condition: |
| while True: |
| now = _time.time() |
| delta = _process(self._state, now) |
| self._state.condition.notify_all() |
| if delta.mature_behaviors: |
| self._calling = delta.earliest_mature_time |
| break |
| self._calling = None |
| if delta.earliest_immature_time is None: |
| self._active = False |
| return |
| else: |
| timeout = max(0, delta.earliest_immature_time - now) |
| self._state.condition.wait(timeout=timeout) |
| _call(delta.mature_behaviors) |
| |
| def _ensure_called_through(self, time): |
| with self._state.condition: |
| while ((self._state.times_to_behaviors and |
| min(self._state.times_to_behaviors) < time) or |
| (self._calling is not None and self._calling < time)): |
| self._state.condition.wait() |
| |
| def _call_at(self, behavior, time): |
| with self._state.condition: |
| self._state.times_to_behaviors[time].append(behavior) |
| if self._active: |
| self._state.condition.notify_all() |
| else: |
| activity = threading.Thread(target=self._activity) |
| activity.start() |
| self._active = True |
| return _Future(self._state, behavior, time) |
| |
| def time(self): |
| return _time.time() |
| |
| def call_in(self, behavior, delay): |
| return self._call_at(behavior, _time.time() + delay) |
| |
| def call_at(self, behavior, time): |
| return self._call_at(behavior, time) |
| |
| def sleep_for(self, duration): |
| time = _time.time() + duration |
| _time.sleep(duration) |
| self._ensure_called_through(time) |
| |
| def sleep_until(self, time): |
| _time.sleep(max(0, time - _time.time())) |
| self._ensure_called_through(time) |
| |
| |
| class StrictFakeTime(grpc_testing.Time): |
| |
| def __init__(self, time): |
| self._state = _State() |
| self._time = time |
| |
| def time(self): |
| return self._time |
| |
| def call_in(self, behavior, delay): |
| if delay <= 0: |
| _call_in_thread((behavior,)) |
| else: |
| with self._state.condition: |
| time = self._time + delay |
| self._state.times_to_behaviors[time].append(behavior) |
| return _Future(self._state, behavior, time) |
| |
| def call_at(self, behavior, time): |
| with self._state.condition: |
| if time <= self._time: |
| _call_in_thread((behavior,)) |
| else: |
| self._state.times_to_behaviors[time].append(behavior) |
| return _Future(self._state, behavior, time) |
| |
| def sleep_for(self, duration): |
| if 0 < duration: |
| with self._state.condition: |
| self._time += duration |
| delta = _process(self._state, self._time) |
| _call_in_thread(delta.mature_behaviors) |
| |
| def sleep_until(self, time): |
| with self._state.condition: |
| if self._time < time: |
| self._time = time |
| delta = _process(self._state, self._time) |
| _call_in_thread(delta.mature_behaviors) |