This commit is contained in:
wandoubaba 2023-10-19 15:12:20 +08:00
parent 0dcefe05f8
commit c931365f65
10 changed files with 490 additions and 6 deletions

View File

@ -7,6 +7,11 @@ SERVER_LISTEN = http://0.0.0.0:7878
SERVER_COUNT = cpu_count()
MAX_PACKAGE_SIZE = 1024*1024*5
# jsonrcp服务配置
JSONRPC_SERVER_NAME = jsonrpc
JSONRPC_SERVER_PORT = 8022
JSONRPC_SERVER_COUNT = cpu_count()
# REDIS配置
REDIS_HOST = 127.0.0.1
REDIS_PORT = 6379

View File

@ -2,8 +2,6 @@
namespace app\controller;
use ErrorException;
use Exception;
use support\Log;
use support\Request;
use Wandoubaba\Res;
@ -13,7 +11,14 @@ class IndexController
public function index(Request $request)
{
$res = new Res();
$res->setData([env('LOG_MAX_FILES_DEBUG') ?: env('LOG_MAX_FIlES') ?: 7]);
$res1 = \jsonrpc\Client::service('default', 'Demo')->hello('Json');
$res2 = \jsonrpc\Client::service('user', 'User')->login('admin', '123456');
$res3 = \jsonrpc\Client::service('default', 'Demo')->text();
$res->setData([
'res1' => $res1,
'res2' => $res2,
'res3' => $res3,
]);
return json($res);
}
@ -26,5 +31,4 @@ class IndexController
{
return json(['code' => 0, 'msg' => 'ok']);
}
}

30
app/jsonrpc/Demo.php Normal file
View File

@ -0,0 +1,30 @@
<?php
namespace app\jsonrpc;
use Wandoubaba\Res;
class Demo
{
/**
* 提供rpc调用的服务方法示例方法属性是public static
*
* @author Aaron <chenqiang@h024.cn>
*
* @param string $name
*
* @return array Res转换的数组
*/
public static function hello(string $name)
{
$res = new Res();
$res->success()->setMsg("Hello, {$name}");
return $res;
}
public static function text()
{
return '直接返回字符串';
}
}

30
app/jsonrpc/User.php Normal file
View File

@ -0,0 +1,30 @@
<?php
namespace app\jsonrpc;
use Wandoubaba\Res;
class User
{
/**
* 提供rpc调用的服务方法示例方法属性是public static
*
* @author Aaron <chenqiang@h024.cn>
*
* @param string $name
* @param string $password
*
* @return array Res转换的数组
*/
public static function login(string $name, string $password)
{
$res = new Res();
$correct = password_hash('123456', PASSWORD_DEFAULT);
if (password_verify($password, $correct)) {
$res->success()->setMsg("user {$name} login success.");
} else {
$res->failed()->setMsg("user {$name} login failed.");
}
return $res;
}
}

View File

@ -47,7 +47,8 @@
"app\\View\\Components\\": "./app/view/components"
},
"files": [
"./support/helpers.php"
"./support/helpers.php",
"./jsonrpc/Protocol.php"
]
},
"scripts": {

17
config/jsonrpc.php Normal file
View File

@ -0,0 +1,17 @@
<?php
/**
* 这是配合jsonrpc/Client.php使用的配置文件
* 如果项目中只提供服务端而不提供客户端,则不需要做这个配置
*/
return [
'default' => [
'tcp://127.0.0.1:' . env('JSONRPC_SERVER_PORT', 8021), // rpc服务端的地址和端口
],
'user' => [
'tcp://127.0.0.1:8022',
],
// 'server2' => [
// /// ...
// ]
];

View File

@ -1,4 +1,5 @@
<?php
/**
* This file is part of webman.
*
@ -38,5 +39,10 @@ return [
'enable_memory_monitor' => DIRECTORY_SEPARATOR === '/',
]
]
]
],
env('JSONRPC_SERVER_NAME', 'jsonrpc') => [
'handler' => jsonrpc\Server::class,
'listen' => 'JsonNL://0.0.0.0:' . env('JSONRPC_SERVER_PORT', '8021'),
'count' => eval("return " . env('JSONRPC_SERVER_COUNT', cpu_count() * 4) . ";"),
],
];

270
jsonrpc/Client.php Normal file
View File

@ -0,0 +1,270 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace jsonrpc;
use Protocols\JsonNL;
/**
* Aaron修改注释
* 需要在config目录下创建jsonrpc.php文件内容如下:
*
return [
'default' => [
'tcp://127.0.0.1:11002', // rpc服务端的地址和端口
],
'server1' => [
/// ...
],
'server2' => [
/// ...
]
];
*
* 客户端的调用方法
// 调用default服务组中Demo类中的hello方法并传入参数$name
$res = RpcClient::service('default', 'Demo')->hello($name);
// 调用server1服务组中User类中的login方法并传入参数$name, $password,
$res = RpcClient::service('server1', 'User')->login($userName, $userPassword);
*/
/**
*
* RpcClient Rpc客户端
*
*
* 示例
* // 服务端列表
$address_array = array(
'tcp://127.0.0.1:2015',
'tcp://127.0.0.1:2015'
);
// 配置服务端列表
RpcClient::config($address_array);
$uid = 567;
$user_client = RpcClient::instance('User');
// ==同步调用==
$ret_sync = $user_client->getInfoByUid($uid);
// ==异步调用==
// 异步发送数据
$user_client->asend_getInfoByUid($uid);
$user_client->asend_getEmail($uid);
这里是其它的业务代码
..............................................
// 异步接收数据
$ret_async1 = $user_client->arecv_getEmail($uid);
$ret_async2 = $user_client->arecv_getInfoByUid($uid);
*
* @author walkor <worker-man@qq.com>
*/
class Client
{
/**
* 发送数据和接收数据的超时时间 单位S
* @var integer
*/
const TIME_OUT = 5;
/**
* 异步调用发送数据前缀
* @var string
*/
const ASYNC_SEND_PREFIX = 'asend_';
/**
* 异步调用接收数据
* @var string
*/
const ASYNC_RECV_PREFIX = 'arecv_';
/**
* 服务端地址
* @var array
*/
protected static $addressArray = array();
/**
* 异步调用实例
* @var string
*/
protected static $asyncInstances = array();
/**
* 同步调用实例
* @var string
*/
protected static $instances = array();
/**
* 到服务端的socket连接
* @var resource
*/
protected $connection = null;
/**
* 实例的服务名
* @var string
*/
protected $serviceName = '';
/**
* Client的唯一公开方法调用方式
*
* $res = Client::service('default', 'Demo')->hello($name);
*
* @author Aaron <chenqiang@h024.cn>
*
* @param [type] $config 在config/rpc.php文件中定义的服务器组名称
* @param [type] $service_name rpc服务器中的对象名称
*/
public static function service(string $config, string $service_name)
{
$address_array = config('jsonrpc.' . $config);
self::config($address_array);
return self::instance($service_name);
}
/**
* 设置/获取服务端地址
* @param array $address_array
*/
protected static function config($address_array = array())
{
if (!empty($address_array)) {
self::$addressArray = $address_array;
}
return self::$addressArray;
}
/**
* 获取一个实例
* @param string $service_name
* @return instance of RpcClient
*/
protected static function instance($service_name)
{
if (!isset(self::$instances[$service_name])) {
self::$instances[$service_name] = new self($service_name);
}
return self::$instances[$service_name];
}
/**
* 构造函数
* @param string $service_name
*/
protected function __construct($service_name)
{
$this->serviceName = $service_name;
}
/**
* 调用
* @param string $method
* @param array $arguments
* @throws Exception
* @return
*/
public function __call($method, $arguments)
{
// 判断是否是异步发送
if (0 === strpos($method, self::ASYNC_SEND_PREFIX)) {
$real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX));
$instance_key = $real_method . serialize($arguments);
if (isset(self::$asyncInstances[$instance_key])) {
throw new \Exception($this->serviceName . "->$method(" . implode(',', $arguments) . ") have already been called");
}
self::$asyncInstances[$instance_key] = new self($this->serviceName);
return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
}
// 如果是异步接受数据
if (0 === strpos($method, self::ASYNC_RECV_PREFIX)) {
$real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
$instance_key = $real_method . serialize($arguments);
if (!isset(self::$asyncInstances[$instance_key])) {
throw new \Exception($this->serviceName . "->asend_$real_method(" . implode(',', $arguments) . ") have not been called");
}
$tmp = (object)self::$asyncInstances[$instance_key];
unset(self::$asyncInstances[$instance_key]);
return $tmp->recvData();
}
// 同步发送接收
$this->sendData($method, $arguments);
return $this->recvData();
}
/**
* 发送数据给服务端
* @param string $method
* @param array $arguments
*/
protected function sendData($method, $arguments)
{
$this->openConnection();
$bin_data = JsonNL::encode(array(
'class' => $this->serviceName,
'method' => $method,
'param_array' => $arguments,
));
if (fwrite($this->connection, $bin_data) !== strlen($bin_data)) {
throw new \Exception('Can not send data');
}
return true;
}
/**
* 从服务端接收数据
* @throws Exception
*/
protected function recvData()
{
$ret = fgets($this->connection);
$this->closeConnection();
if (!$ret) {
throw new \Exception("recvData empty");
}
return JsonNL::decode($ret);
}
/**
* 打开到服务端的连接
* @return void
*/
protected function openConnection()
{
$address = self::$addressArray[array_rand(self::$addressArray)];
$this->connection = stream_socket_client($address, $err_no, $err_msg);
if (!$this->connection) {
throw new \Exception("can not connect to $address , $err_no:$err_msg");
}
stream_set_blocking($this->connection, true);
stream_set_timeout($this->connection, self::TIME_OUT);
}
/**
* 关闭到服务端的连接
* @return void
*/
protected function closeConnection()
{
fclose($this->connection);
$this->connection = null;
}
}

66
jsonrpc/Protocol.php Normal file
View File

@ -0,0 +1,66 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
// 这个文件需要单独在compose.json中的autoload->files中引用
// "./jsonnl/Protocol.php"
namespace Protocols;
/**
* RPC 协议解析 相关
* 协议格式为 [json字符串\n]
* @author walkor <worker-man@qq.com>
* */
class JsonNL
{
/**
* 检查包的完整性
* 如果能够得到包长则返回包的在buffer中的长度否则返回0继续等待数据
* @param string $buffer
*/
public static function input($buffer)
{
// 获得换行字符"\n"位置
$pos = strpos($buffer, "\n");
// 没有换行符无法得知包长返回0继续等待数据
if ($pos === false) {
return 0;
}
// 有换行符,返回当前包长(包含换行符)
return $pos + 1;
}
/**
* 打包,当向客户端发送数据的时候会自动调用
* @param string $buffer
* @return string
*/
public static function encode($buffer)
{
// json序列化并加上换行符作为请求结束的标记
return json_encode($buffer) . "\n";
}
/**
* 解包当接收到的数据字节数等于input返回的值大于0的值自动调用
* 并传递给onMessage回调函数的$data参数
* @param string $buffer
* @return string
*/
public static function decode($buffer)
{
// 去掉换行,还原成数组
return json_decode(trim($buffer), true);
}
}

55
jsonrpc/Server.php Normal file
View File

@ -0,0 +1,55 @@
<?php
namespace jsonrpc;
use Workerman\Connection\TcpConnection;
/**
* 在config/process中把这个类注册为服务
*
*
* @author Aaron Chen <qiang.c@wukezhenzhu.com>
*/
class Server
{
/**
* 在项目的app目录下创建jsonrpc目录它下面的类的静态方法可以被jsonrpc客户端调用
*/
static $service_space = "app\\jsonrpc";
public function onMessage(TcpConnection $connection, $data)
{
// 判断数据是否正确
if (empty($data['class']) || empty($data['method']) || !isset($data['param_array'])) {
// 发送数据给客户端,请求包错误
return $connection->send(array('code' => 400, 'msg' => 'bad request'));
}
// 获得要调用的类、方法、及参数
$class = self::$service_space . "\\{$data['class']}";
$method = $data['method'];
$param_array = $data['param_array'];
// 判断类对应文件是否载入
if (!class_exists($class)) {
if (!class_exists($class) || !method_exists($class, $method)) {
$code = 404;
$msg = "class $class or method $method not found";
// 发送数据给客户端 类不存在
return $connection->send(array('code' => $code, 'msg' => $msg, 'data' => null));
}
}
// 调用类的方法
try {
$ret = call_user_func_array(array($class, $method), $param_array);
// 发送数据给客户端调用成功data下标对应的元素即为调用结果
return $connection->send($ret);
}
// 有异常
catch (\Exception $e) {
// 发送数据给客户端,发生异常,调用失败
$code = $e->getCode() ? $e->getCode() : 500;
return $connection->send(array('code' => $code, 'msg' => $e->getMessage(), 'data' => $e));
}
}
}