| # Copyright 2015 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| require 'spec_helper' |
| require 'timeout' |
| |
| include Timeout |
| include GRPC::Core |
| include GRPC::Spec::Helpers |
| |
| def start_server(port = 0) |
| @srv = new_rpc_server_for_testing(pool_size: 1) |
| server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) |
| @srv.handle(EchoService) |
| @server_thd = Thread.new { @srv.run } |
| @srv.wait_till_running |
| server_port |
| end |
| |
| def stop_server |
| expect(@srv.stopped?).to be(false) |
| @srv.stop |
| @server_thd.join |
| expect(@srv.stopped?).to be(true) |
| end |
| |
| describe 'channel connection behavior' do |
| it 'the client channel handles temporary loss of a transport' do |
| port = start_server |
| stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure) |
| req = EchoMsg.new |
| expect(stub.an_rpc(req)).to be_a(EchoMsg) |
| stop_server |
| sleep 1 |
| # TODO(apolcyn) grabbing the same port might fail, is this stable enough? |
| start_server(port) |
| expect(stub.an_rpc(req)).to be_a(EchoMsg) |
| stop_server |
| end |
| |
| it 'observably connects and reconnects to transient server' \ |
| ' when using the channel state API' do |
| port = start_server |
| ch = GRPC::Core::Channel.new("localhost:#{port}", {}, |
| :this_channel_is_insecure) |
| |
| expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) |
| |
| state = ch.connectivity_state(true) |
| |
| count = 0 |
| while count < 20 && state != GRPC::Core::ConnectivityStates::READY |
| ch.watch_connectivity_state(state, Time.now + 60) |
| state = ch.connectivity_state(true) |
| count += 1 |
| end |
| |
| expect(state).to be(GRPC::Core::ConnectivityStates::READY) |
| |
| stop_server |
| |
| state = ch.connectivity_state |
| |
| count = 0 |
| while count < 20 && state == GRPC::Core::ConnectivityStates::READY |
| ch.watch_connectivity_state(state, Time.now + 60) |
| state = ch.connectivity_state |
| count += 1 |
| end |
| |
| expect(state).to_not be(GRPC::Core::ConnectivityStates::READY) |
| |
| start_server(port) |
| |
| state = ch.connectivity_state(true) |
| |
| count = 0 |
| while count < 20 && state != GRPC::Core::ConnectivityStates::READY |
| ch.watch_connectivity_state(state, Time.now + 60) |
| state = ch.connectivity_state(true) |
| count += 1 |
| end |
| |
| expect(state).to be(GRPC::Core::ConnectivityStates::READY) |
| |
| stop_server |
| end |
| |
| it 'concurrent watches on the same channel' do |
| timeout(180) do |
| port = start_server |
| ch = GRPC::Core::Channel.new("localhost:#{port}", {}, |
| :this_channel_is_insecure) |
| stop_server |
| |
| thds = [] |
| 50.times do |
| thds << Thread.new do |
| while ch.connectivity_state(true) != ConnectivityStates::READY |
| ch.watch_connectivity_state( |
| ConnectivityStates::READY, Time.now + 60) |
| break |
| end |
| end |
| end |
| |
| sleep 0.01 |
| |
| start_server(port) |
| |
| thds.each(&:join) |
| |
| stop_server |
| end |
| end |
| end |