blob: b814652063d4d66b398063a41a46997b9e08659f [file] [log] [blame]
<?php
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* This is the PHP xDS Interop test client. This script is meant to be run by
* the main xDS Interep test runner "run_xds_tests.py", not to be run
* by itself standalone.
*/
$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
require_once $autoload_path;
class XdsUpdateClientConfigureService
extends \Grpc\Testing\XdsUpdateClientConfigureServiceStub
{
function configure(\Grpc\Testing\ClientConfigureRequest $request) {
$rpc_types = $request->getTypes();
$all_metadata = $request->getMetadata();
$rpcs_to_send = [];
foreach ($rpc_types as $rpc_type) {
if ($rpc_type ==
\Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
$rpcs_to_send[] = 'EmptyCall';
} else if ($rpc_type ==
\Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
$rpcs_to_send[] = 'UnaryCall';
}
}
$metadata_to_send = [];
foreach ($all_metadata as $metadata) {
$rpc_type = $metadata->getType();
if ($rpc_type ==
\Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
$rpc_type_key = 'EmptyCall';
} else if ($rpc_type ==
\Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
$rpc_type_key = 'UnaryCall';
}
$key = $metadata->getKey();
$value = $metadata->getValue();
if (!isset($metadata_to_send[$rpc_type_key])) {
$metadata_to_send[$rpc_type_key] = [];
}
$metadata_to_send[$rpc_type_key][$key] = $value;
}
global $client_thread;
echo "PHP parent: Setting client_thread rpc_config to \n";
print_r($rpcs_to_send);
print_r($metadata_to_send);
echo "PHP parent: timeout_sec = ".$request->getTimeoutSec()."\n";
$client_thread->rpc_config->update($rpcs_to_send,
$metadata_to_send,
$request->getTimeoutSec());
return new Grpc\Testing\ClientConfigureResponse();
}
}
// The main xds interop test runner will ping this service to ask for
// the stats of the distribution of the backends, for the next X rpcs.
class LoadBalancerStatsService
extends \Grpc\Testing\LoadBalancerStatsServiceStub
{
function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) {
$num_rpcs = $request->getNumRpcs();
$timeout_sec = $request->getTimeoutSec();
$rpcs_by_method = [];
$rpcs_by_peer = [];
$num_failures = 0;
// Heavy limitation now: the server is blocking, until all
// the necessary num_rpcs are finished, or timeout is reached
global $client_thread;
$start_id = $client_thread->num_results + 1;
$end_id = $start_id + $num_rpcs;
$now = hrtime(true);
$timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
while (true) {
$curr_hr = hrtime(true);
$curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
if ($curr_time > $timeout) {
break;
}
// Thread variable seems to be read-only
$curr_id = $client_thread->num_results;
if ($curr_id >= $end_id) {
break;
}
usleep(50000);
}
// Tally up results
$end_id = min($end_id, $client_thread->num_results);
// "$client_thread->results" will be in the form of
// [
// 'rpc1' => [
// 'hostname1', '', 'hostname2', 'hostname1', '', ...
// ],
// 'rpc2' => [
// '', 'hostname1', 'hostname2', '', 'hostname2', ...
// ],
// ]
foreach ((array)$client_thread->rpc_config->rpcs_to_send as $rpc) {
$results = $client_thread->results[$rpc];
// initialize, can always start from scratch here
$rpcs_by_method[$rpc] = [];
for ($i = $start_id; $i < $end_id; $i++) {
$hostname = $results[$i];
if ($hostname) {
// initialize in case we haven't seen this hostname
// before
if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
$rpcs_by_method[$rpc][$hostname] = 0;
}
if (!array_key_exists($hostname, $rpcs_by_peer)) {
$rpcs_by_peer[$hostname] = 0;
}
// increment the remote hostname distribution histogram
// both by overall, and broken down per RPC
$rpcs_by_method[$rpc][$hostname] += 1;
$rpcs_by_peer[$hostname] += 1;
} else {
// $num_failures here are counted per individual RPC
$num_failures += 1;
}
}
}
// Convert our hashmaps above into protobuf objects
$response = new Grpc\Testing\LoadBalancerStatsResponse();
$rpcs_by_method_map = [];
foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
$rpcs_by_peer_proto_obj
= new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
$rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
$rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
}
$response->setRpcsByPeer($rpcs_by_peer);
$response->setRpcsByMethod($rpcs_by_method_map);
$response->setNumFailures($num_failures);
return $response;
}
function getClientAccumulatedStats(
\Grpc\Testing\LoadBalancerAccumulatedStatsRequest $request) {
global $client_thread;
$response = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse();
$response->setNumRpcsStartedByMethod(
(array)$client_thread->num_rpcs_started_by_method);
$response->setNumRpcsSucceededByMethod(
(array)$client_thread->num_rpcs_succeeded_by_method);
$response->setNumRpcsFailedByMethod(
(array)$client_thread->num_rpcs_failed_by_method);
$accumulated_method_stats
= (array)$client_thread->accumulated_method_stats;
$stats_per_method = [];
foreach ($accumulated_method_stats as $rpc_name => $stats) {
$methodStats
= new Grpc\Testing\LoadBalancerAccumulatedStatsResponse\MethodStats();
$methodStats->setRpcsStarted($stats['rpcs_started']);
$methodStats->setResult((array)$stats['result']);
$stats_per_method[$rpc_name] = $methodStats;
}
$response->setStatsPerMethod($stats_per_method);
return $response;
}
}
class RpcConfig extends Volatile {
public $server_address;
public $qps;
public $fail_on_failed_rpcs;
public $rpcs_to_send;
public $metadata_to_send;
public $tmp_file1;
public $tmp_file2;
public $timeout_sec;
public function __construct($server_address,
$qps,
$fail_on_failed_rpcs,
$rpcs_to_send,
$metadata_to_send,
$tmp_file1,
$tmp_file2) {
$this->server_address = $server_address;
$this->qps = $qps;
$this->fail_on_failed_rpcs = $fail_on_failed_rpcs;
$this->rpcs_to_send = (array)$rpcs_to_send;
$this->metadata_to_send = (array)$metadata_to_send;
$this->tmp_file1 = $tmp_file1;
$this->tmp_file2 = $tmp_file2;
$this->timeout_sec = 30;
}
public function update($rpcs_to_send, $metadata_to_send, $timeout_sec) {
$this->rpcs_to_send = (array)$rpcs_to_send;
$this->metadata_to_send = (array)$metadata_to_send;
$this->timeout_sec = $timeout_sec;
}
}
// This client thread blindly sends a unary RPC to the server once
// every 1 / qps seconds.
class ClientThread extends Thread {
private $target_seconds_between_rpcs_;
private $autoload_path_;
private $TIMEOUT_US = 30 * 1e6; // 30 seconds
public $rpc_config;
public $num_results = 0;
public $results;
public $RPC_MAP = [
'UnaryCall' => 'UNARY_CALL',
'EmptyCall' => 'EMPTY_CALL',
];
public $num_rpcs_started_by_method = [];
public $num_rpcs_succeeded_by_method = [];
public $num_rpcs_failed_by_method = [];
public $accumulated_method_stats = [];
public function __construct($rpc_config,
$autoload_path) {
$this->rpc_config = $rpc_config;
$this->target_seconds_between_rpcs_ = 1.0 / $rpc_config->qps;
$this->autoload_path_ = $autoload_path;
$this->simple_request = new Grpc\Testing\SimpleRequest();
$this->empty_request = new Grpc\Testing\EmptyMessage();
$this->results = [];
foreach (['UnaryCall', 'EmptyCall'] as $rpc) {
$this->results[$rpc] = [];
}
$this->outstanding_rpcs = [];
foreach (['UNARY_CALL', 'EMPTY_CALL'] as $rpc_stats_key) {
$this->num_rpcs_started_by_method[$rpc_stats_key] = 0;
$this->num_rpcs_succeeded_by_method[$rpc_stats_key] = 0;
$this->num_rpcs_failed_by_method[$rpc_stats_key] = 0;
$this->accumulated_method_stats[$rpc_stats_key] = [
'rpcs_started' => 0,
'result' => [],
];
}
}
public function sendUnaryCall($stub, $metadata) {
$timeout = $this->rpc_config->timeout_sec ?
$this->rpc_config->timeout_sec * 1e6 :
$this->TIMEOUT_US;
return $stub->UnaryCall($this->simple_request,
$metadata,
['timeout' => $timeout]);
}
public function sendEmptyCall($stub, $metadata) {
$timeout = $this->rpc_config->timeout_sec ?
$this->rpc_config->timeout_sec * 1e6 :
$this->TIMEOUT_US;
return $stub->EmptyCall($this->empty_request,
$metadata,
['timeout' => $timeout]);
}
public function add_rpc_result($rpc, $status_code) {
// $rpc here needs to be in the format of 'UnaryCall', 'EmptyCall'
if (!isset($this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
['result'][$status_code])) {
$this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
['result'][$status_code] = 0;
}
$this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
['result'][$status_code] += 1;
}
public function check_child_process_result() {
if (sizeof($this->outstanding_rpcs) > 0 &&
$this->rpc_config->tmp_file2) {
$keys_to_delete = [];
// tmp_file2 contains the RPC result of each RPC we
// originally wrote to tmp_file1
$f2 = fopen($this->rpc_config->tmp_file2, 'r+');
flock($f2, LOCK_EX);
while (true) {
$f2_line = fgets($f2);
if (!$f2_line) {
break;
}
// format here needs to be in sync with
// src/php/bin/xds_manager.py
$parts = explode(',', trim($f2_line));
$key = $parts[0];
$returncode = $parts[1];
if (isset($this->outstanding_rpcs[$key])) {
$parts2 = explode('|', $key);
$result_num = $parts2[0];
$rpc_name = $parts2[1];
// Child processes can only communicate back the
// status code for now.
// Current interop test specs only call for
// reporting back the status code in these scenarios.
// If we ever need the hostname reported back from
// child processes, we need to enhance this
// communication framework through tmp files.
$this->results[$rpc_name][$result_num] = "";
if ($returncode) {
$this->num_rpcs_failed_by_method
[$this->RPC_MAP[$rpc_name]] += 1;
} else {
$this->num_rpcs_succeeded_by_method
[$this->RPC_MAP[$rpc_name]] += 1;
}
$this->add_rpc_result($rpc_name, $returncode);
$keys_to_delete[] = $key;
}
}
foreach ($keys_to_delete as $key) {
unset($this->outstanding_rpcs[$key]);
}
ftruncate($f2, 0);
flock($f2, LOCK_UN);
fclose($f2);
}
}
public function execute_rpc_in_child_process($rpc, $metadata_serialized) {
// tmp_file1 contains the list of RPCs (and their
// specs) we want executed. This will be picked up
// by src/php/bin/xds_manager.py
$f1 = fopen($this->rpc_config->tmp_file1, 'a');
$key = implode('|', [$this->num_results,
$rpc,
$metadata_serialized,
$this->rpc_config->timeout_sec]);
flock($f1, LOCK_EX);
fwrite($f1, $key."\n");
fflush($f1);
flock($f1, LOCK_UN);
fclose($f1);
$this->outstanding_rpcs[$key] = 1;
$this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
$this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
['rpcs_started'] += 1;
}
public function run() {
// Autoloaded classes do not get inherited in threads.
// Hence we need to do this.
require_once($this->autoload_path_);
$stub = new Grpc\Testing\TestServiceClient(
$this->rpc_config->server_address,
['credentials' => Grpc\ChannelCredentials::createInsecure()
]);
// hrtime returns nanoseconds
$target_next_start_us = hrtime(true) / 1000;
while (true) {
$now_us = hrtime(true) / 1000;
$sleep_us = $target_next_start_us - $now_us;
if ($sleep_us < 0) {
$target_next_start_us =
$now_us + ($this->target_seconds_between_rpcs_ * 1e6);
} else {
$target_next_start_us +=
($this->target_seconds_between_rpcs_ * 1e6);
usleep($sleep_us);
}
$this->check_child_process_result();
foreach ($this->rpc_config->rpcs_to_send as $rpc) {
$metadata_to_send_arr
= (array)$this->rpc_config->metadata_to_send;
$metadata = array_key_exists($rpc, $metadata_to_send_arr) ?
$metadata_to_send_arr[$rpc] : [];
// This copy is somehow necessary because
// $this->metadata_to_send[$rpc] somehow becomes a
// Volatile object, instead of an associative array.
$metadata_array = [];
$execute_in_child_process = false;
foreach ($metadata as $key => $value) {
$metadata_array[$key] = [$value];
if ($key == 'rpc-behavior' || $key == 'fi_testcase') {
$execute_in_child_process = true;
}
}
if ($execute_in_child_process && $this->rpc_config->tmp_file1) {
// if 'rpc-behavior' is set, we need to pawn off
// the execution to some other child PHP processes
$this->execute_rpc_in_child_process(
$rpc, serialize($metadata_array));
continue;
}
// Execute RPC within this script
$call = null;
if ($rpc == 'UnaryCall') {
$call = $this->sendUnaryCall($stub, $metadata_array);
} else if ($rpc == 'EmptyCall') {
$call = $this->sendEmptyCall($stub, $metadata_array);
} else {
throw new Exception("Unhandled rpc $rpc");
}
$this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
$this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
['rpcs_started'] += 1;
// the remote peer is being returned as part of the
// initial metadata, according to the test spec
$initial_metadata = $call->getMetadata();
list($response, $status) = $call->wait();
if ($status->code == Grpc\STATUS_OK &&
array_key_exists('hostname', $initial_metadata)) {
$this->results[$rpc][$this->num_results]
= $initial_metadata['hostname'][0];
$this->num_rpcs_succeeded_by_method
[$this->RPC_MAP[$rpc]] += 1;
$this->add_rpc_result($rpc, 0);
} else {
if ($this->rpc_config->fail_on_failed_rpcs_) {
throw new Exception("$rpc failed with status "
. $status->code);
}
$this->results[$rpc][$this->num_results] = "";
$this->num_rpcs_failed_by_method
[$this->RPC_MAP[$rpc]] += 1;
$this->add_rpc_result($rpc, $status->code);
}
}
// $num_results here is only incremented when the group of
// all $rpcs_to_send are done.
$this->num_results++;
}
}
// This is needed for loading autoload_path in the child thread
public function start(int $options = PTHREADS_INHERIT_ALL) {
return parent::start(PTHREADS_INHERIT_NONE);
}
}
// Note: num_channels are currently ignored for now
$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
'rpc:', 'metadata:', 'tmp_file1:', 'tmp_file2:',
'server:', 'stats_port:', 'qps:']);
// Convert input in the form of
// rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
// into
// [
// 'rpc1' => [
// 'k1' => 'v1',
// 'k3' => 'v3',
// ],
// 'rpc2' => [
// 'k2' => 'v2'
// ],
// ]
$metadata_to_send = [];
if ($_all_metadata = explode(',', $args['metadata'])) {
foreach ($_all_metadata as $one_metadata_pair) {
list($rpc,
$metadata_key,
$metadata_value) = explode(':', $one_metadata_pair);
// initialize in case we haven't seen this rpc before
if (!array_key_exists($rpc, $metadata_to_send)) {
$metadata_to_send[$rpc] = [];
}
$metadata_to_send[$rpc][$metadata_key] = $metadata_value;
}
}
$rpcs_to_send = (empty($args['rpc']) ? 'UnaryCall' : $args['rpc']);
// Need to communicate the xds server name to the async runner manager
if ($args['tmp_file1']) {
$f1 = fopen($args['tmp_file1'], 'w');
fwrite($f1, 'server_address,'.$args['server']);
fclose($f1);
}
$rpc_config = new RpcConfig($args['server'],
$args['qps'],
$args['fail_on_failed_rpcs'],
explode(',', $rpcs_to_send),
$metadata_to_send,
$args['tmp_file1'],
$args['tmp_file2']);
$client_thread = new ClientThread($rpc_config,
$autoload_path);
$client_thread->start();
$server = new Grpc\RpcServer();
$server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
$server->handle(new LoadBalancerStatsService());
$server->handle(new XdsUpdateClientConfigureService());
$server->run();