diff --git a/.env.example b/.env.example index 30692b3..8344466 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,11 @@ SERVER_LISTEN = http://0.0.0.0:7878 SERVER_COUNT = cpu_count() MAX_PACKAGE_SIZE = 1024*1024*5 +# jsonrcp服务配置 +JSONRPC_SERVER_NAME = jsonrpc +JSONRPC_SERVER_PORT = 8022 +JSONRPC_SERVER_COUNT = cpu_count() + # REDIS配置 REDIS_HOST = 127.0.0.1 REDIS_PORT = 6379 diff --git a/app/controller/IndexController.php b/app/controller/IndexController.php index ec333de..3515a48 100644 --- a/app/controller/IndexController.php +++ b/app/controller/IndexController.php @@ -2,8 +2,6 @@ namespace app\controller; -use ErrorException; -use Exception; use support\Log; use support\Request; use Wandoubaba\Res; @@ -13,7 +11,14 @@ class IndexController public function index(Request $request) { $res = new Res(); - $res->setData([env('LOG_MAX_FILES_DEBUG') ?: env('LOG_MAX_FIlES') ?: 7]); + $res1 = \jsonrpc\Client::service('default', 'Demo')->hello('Json'); + $res2 = \jsonrpc\Client::service('user', 'User')->login('admin', '123456'); + $res3 = \jsonrpc\Client::service('default', 'Demo')->text(); + $res->setData([ + 'res1' => $res1, + 'res2' => $res2, + 'res3' => $res3, + ]); return json($res); } @@ -26,5 +31,4 @@ class IndexController { return json(['code' => 0, 'msg' => 'ok']); } - } diff --git a/app/jsonrpc/Demo.php b/app/jsonrpc/Demo.php new file mode 100644 index 0000000..1835679 --- /dev/null +++ b/app/jsonrpc/Demo.php @@ -0,0 +1,30 @@ + + * + * @param string $name + * + * @return array Res转换的数组 + */ + public static function hello(string $name) + { + $res = new Res(); + + $res->success()->setMsg("Hello, {$name}"); + return $res; + } + + public static function text() + { + return '直接返回字符串'; + } +} diff --git a/app/jsonrpc/User.php b/app/jsonrpc/User.php new file mode 100644 index 0000000..bae1d04 --- /dev/null +++ b/app/jsonrpc/User.php @@ -0,0 +1,30 @@ + + * + * @param string $name + * @param string $password + * + * @return array Res转换的数组 + */ + public static function login(string $name, string $password) + { + $res = new Res(); + $correct = password_hash('123456', PASSWORD_DEFAULT); + if (password_verify($password, $correct)) { + $res->success()->setMsg("user {$name} login success."); + } else { + $res->failed()->setMsg("user {$name} login failed."); + } + return $res; + } +} diff --git a/composer.json b/composer.json index 33d5036..ded7bf7 100644 --- a/composer.json +++ b/composer.json @@ -47,7 +47,8 @@ "app\\View\\Components\\": "./app/view/components" }, "files": [ - "./support/helpers.php" + "./support/helpers.php", + "./jsonrpc/Protocol.php" ] }, "scripts": { diff --git a/config/jsonrpc.php b/config/jsonrpc.php new file mode 100644 index 0000000..b9e1fc4 --- /dev/null +++ b/config/jsonrpc.php @@ -0,0 +1,17 @@ + [ + 'tcp://127.0.0.1:' . env('JSONRPC_SERVER_PORT', 8021), // rpc服务端的地址和端口 + ], + 'user' => [ + 'tcp://127.0.0.1:8022', + ], + // 'server2' => [ + // /// ... + // ] +]; \ No newline at end of file diff --git a/config/process.php b/config/process.php index f94d27f..fa7e4ae 100644 --- a/config/process.php +++ b/config/process.php @@ -1,4 +1,5 @@ DIRECTORY_SEPARATOR === '/', ] ] - ] + ], + env('JSONRPC_SERVER_NAME', 'jsonrpc') => [ + 'handler' => jsonrpc\Server::class, + 'listen' => 'JsonNL://0.0.0.0:' . env('JSONRPC_SERVER_PORT', '8021'), + 'count' => eval("return " . env('JSONRPC_SERVER_COUNT', cpu_count() * 4) . ";"), + ], ]; diff --git a/jsonrpc/Client.php b/jsonrpc/Client.php new file mode 100644 index 0000000..fffae74 --- /dev/null +++ b/jsonrpc/Client.php @@ -0,0 +1,270 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +namespace jsonrpc; + +use Protocols\JsonNL; + +/** + * Aaron修改注释 + * 需要在config目录下创建jsonrpc.php文件,内容如下: + * + return [ + 'default' => [ + 'tcp://127.0.0.1:11002', // rpc服务端的地址和端口 + ], + 'server1' => [ + /// ... + ], + 'server2' => [ + /// ... + ] + ]; + * + * 客户端的调用方法 + // 调用default服务组中Demo类中的hello方法并传入参数$name + $res = RpcClient::service('default', 'Demo')->hello($name); + // 调用server1服务组中User类中的login方法并传入参数$name, $password, + $res = RpcClient::service('server1', 'User')->login($userName, $userPassword); + */ + +/** + * + * RpcClient Rpc客户端 + * + * + * 示例 + * // 服务端列表 + $address_array = array( + 'tcp://127.0.0.1:2015', + 'tcp://127.0.0.1:2015' + ); + // 配置服务端列表 + RpcClient::config($address_array); + + $uid = 567; + $user_client = RpcClient::instance('User'); + // ==同步调用== + $ret_sync = $user_client->getInfoByUid($uid); + + // ==异步调用== + // 异步发送数据 + $user_client->asend_getInfoByUid($uid); + $user_client->asend_getEmail($uid); + + 这里是其它的业务代码 + .............................................. + + // 异步接收数据 + $ret_async1 = $user_client->arecv_getEmail($uid); + $ret_async2 = $user_client->arecv_getInfoByUid($uid); + * + * @author walkor + */ +class Client +{ + /** + * 发送数据和接收数据的超时时间 单位S + * @var integer + */ + const TIME_OUT = 5; + + /** + * 异步调用发送数据前缀 + * @var string + */ + const ASYNC_SEND_PREFIX = 'asend_'; + + /** + * 异步调用接收数据 + * @var string + */ + const ASYNC_RECV_PREFIX = 'arecv_'; + + /** + * 服务端地址 + * @var array + */ + protected static $addressArray = array(); + + /** + * 异步调用实例 + * @var string + */ + protected static $asyncInstances = array(); + + /** + * 同步调用实例 + * @var string + */ + protected static $instances = array(); + + /** + * 到服务端的socket连接 + * @var resource + */ + protected $connection = null; + + /** + * 实例的服务名 + * @var string + */ + protected $serviceName = ''; + + + /** + * Client的唯一公开方法,调用方式: + * + * $res = Client::service('default', 'Demo')->hello($name); + * + * @author Aaron + * + * @param [type] $config 在config/rpc.php文件中定义的服务器组名称 + * @param [type] $service_name rpc服务器中的对象名称 + */ + public static function service(string $config, string $service_name) + { + $address_array = config('jsonrpc.' . $config); + self::config($address_array); + return self::instance($service_name); + } + + /** + * 设置/获取服务端地址 + * @param array $address_array + */ + protected static function config($address_array = array()) + { + if (!empty($address_array)) { + self::$addressArray = $address_array; + } + return self::$addressArray; + } + + /** + * 获取一个实例 + * @param string $service_name + * @return instance of RpcClient + */ + protected static function instance($service_name) + { + if (!isset(self::$instances[$service_name])) { + self::$instances[$service_name] = new self($service_name); + } + return self::$instances[$service_name]; + } + + /** + * 构造函数 + * @param string $service_name + */ + protected function __construct($service_name) + { + $this->serviceName = $service_name; + } + + /** + * 调用 + * @param string $method + * @param array $arguments + * @throws Exception + * @return + */ + public function __call($method, $arguments) + { + // 判断是否是异步发送 + if (0 === strpos($method, self::ASYNC_SEND_PREFIX)) { + $real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX)); + $instance_key = $real_method . serialize($arguments); + if (isset(self::$asyncInstances[$instance_key])) { + throw new \Exception($this->serviceName . "->$method(" . implode(',', $arguments) . ") have already been called"); + } + self::$asyncInstances[$instance_key] = new self($this->serviceName); + return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments); + } + // 如果是异步接受数据 + if (0 === strpos($method, self::ASYNC_RECV_PREFIX)) { + $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX)); + $instance_key = $real_method . serialize($arguments); + if (!isset(self::$asyncInstances[$instance_key])) { + throw new \Exception($this->serviceName . "->asend_$real_method(" . implode(',', $arguments) . ") have not been called"); + } + $tmp = (object)self::$asyncInstances[$instance_key]; + unset(self::$asyncInstances[$instance_key]); + return $tmp->recvData(); + } + // 同步发送接收 + $this->sendData($method, $arguments); + return $this->recvData(); + } + + /** + * 发送数据给服务端 + * @param string $method + * @param array $arguments + */ + protected function sendData($method, $arguments) + { + $this->openConnection(); + $bin_data = JsonNL::encode(array( + 'class' => $this->serviceName, + 'method' => $method, + 'param_array' => $arguments, + )); + if (fwrite($this->connection, $bin_data) !== strlen($bin_data)) { + throw new \Exception('Can not send data'); + } + return true; + } + + /** + * 从服务端接收数据 + * @throws Exception + */ + protected function recvData() + { + $ret = fgets($this->connection); + $this->closeConnection(); + if (!$ret) { + throw new \Exception("recvData empty"); + } + return JsonNL::decode($ret); + } + + /** + * 打开到服务端的连接 + * @return void + */ + protected function openConnection() + { + $address = self::$addressArray[array_rand(self::$addressArray)]; + $this->connection = stream_socket_client($address, $err_no, $err_msg); + if (!$this->connection) { + throw new \Exception("can not connect to $address , $err_no:$err_msg"); + } + stream_set_blocking($this->connection, true); + stream_set_timeout($this->connection, self::TIME_OUT); + } + + /** + * 关闭到服务端的连接 + * @return void + */ + protected function closeConnection() + { + fclose($this->connection); + $this->connection = null; + } +} \ No newline at end of file diff --git a/jsonrpc/Protocol.php b/jsonrpc/Protocol.php new file mode 100644 index 0000000..0e56fc7 --- /dev/null +++ b/jsonrpc/Protocol.php @@ -0,0 +1,66 @@ + + * @copyright walkor + * @link http://www.workerman.net/ + * @license http://www.opensource.org/licenses/mit-license.php MIT License + */ + +// 这个文件需要单独在compose.json中的autoload->files中引用 +// "./jsonnl/Protocol.php" +namespace Protocols; + +/** + * RPC 协议解析 相关 + * 协议格式为 [json字符串\n] + * @author walkor + * */ +class JsonNL +{ + /** + * 检查包的完整性 + * 如果能够得到包长,则返回包的在buffer中的长度,否则返回0继续等待数据 + * @param string $buffer + */ + public static function input($buffer) + { + // 获得换行字符"\n"位置 + $pos = strpos($buffer, "\n"); + // 没有换行符,无法得知包长,返回0继续等待数据 + if ($pos === false) { + return 0; + } + // 有换行符,返回当前包长(包含换行符) + return $pos + 1; + } + + /** + * 打包,当向客户端发送数据的时候会自动调用 + * @param string $buffer + * @return string + */ + public static function encode($buffer) + { + // json序列化,并加上换行符作为请求结束的标记 + return json_encode($buffer) . "\n"; + } + + /** + * 解包,当接收到的数据字节数等于input返回的值(大于0的值)自动调用 + * 并传递给onMessage回调函数的$data参数 + * @param string $buffer + * @return string + */ + public static function decode($buffer) + { + // 去掉换行,还原成数组 + return json_decode(trim($buffer), true); + } +} diff --git a/jsonrpc/Server.php b/jsonrpc/Server.php new file mode 100644 index 0000000..33f85ac --- /dev/null +++ b/jsonrpc/Server.php @@ -0,0 +1,55 @@ + + */ +class Server +{ + /** + * 在项目的app目录下创建jsonrpc目录,它下面的类的静态方法可以被jsonrpc客户端调用 + */ + static $service_space = "app\\jsonrpc"; + + public function onMessage(TcpConnection $connection, $data) + { + // 判断数据是否正确 + if (empty($data['class']) || empty($data['method']) || !isset($data['param_array'])) { + // 发送数据给客户端,请求包错误 + return $connection->send(array('code' => 400, 'msg' => 'bad request')); + } + // 获得要调用的类、方法、及参数 + $class = self::$service_space . "\\{$data['class']}"; + $method = $data['method']; + $param_array = $data['param_array']; + + // 判断类对应文件是否载入 + if (!class_exists($class)) { + if (!class_exists($class) || !method_exists($class, $method)) { + $code = 404; + $msg = "class $class or method $method not found"; + // 发送数据给客户端 类不存在 + return $connection->send(array('code' => $code, 'msg' => $msg, 'data' => null)); + } + } + + // 调用类的方法 + try { + $ret = call_user_func_array(array($class, $method), $param_array); + // 发送数据给客户端,调用成功,data下标对应的元素即为调用结果 + return $connection->send($ret); + } + // 有异常 + catch (\Exception $e) { + // 发送数据给客户端,发生异常,调用失败 + $code = $e->getCode() ? $e->getCode() : 500; + return $connection->send(array('code' => $code, 'msg' => $e->getMessage(), 'data' => $e)); + } + } +}