PHP 使用Swoole批量异步请求方法
刚子 发布于 阅读:229
读取一条redis的list记录,发送给Swoole协程异步请求处理,最多100个槽位,收到一个响应释放一个槽位。
<?php
Swoole\Runtime::enableCoroutine();
$redis = new Redis();
$redis->connect('REMOTE_REDIS_HOST', 6379);
$redis->auth('REDIS_PASS');
// 并发信号灯:同时最多 100 条 HTTP 在飞
$sem = new Channel(100);
go(function () use ($redis, $sem) {
while (true) {
$data = $redis->brPop(['recharge_verification'], 5);
if (!$data) continue;
[$queue, $raw] = $data;
if (strpos($raw, '_') === false) continue;
[$uniq] = explode('_', $raw, 2);
// 占一个并发位,满了就阻塞
$sem->push(true);
// 启动一个独立协程去请求 & 收响应
go(function () use ($uniq, $sem) {
try {
$cli = new Swoole\Coroutine\Http\Client('target.com', 80);
$cli->post('/api', ['id' => $uniq]);
$body = $cli->body;
// 这里立即写库或写日志
file_put_contents('response.log',
date('Y-m-d H:i:s')." | {$uniq} | {$body}\n",
FILE_APPEND | LOCK_EX
);
} finally {
$sem->pop(); // 释放并发位
}
});
}
});
如果请求的是同一个域名,可以使用HTTP/2 多路复用,效率更高。代码如下:
<?php
/**
* http2_worker.php
* 依赖:PHP + Swoole ≥ 4.8
* 运行:php http2_worker.php
*/
declare(strict_types=1);
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Http2\Request;
const CONCURRENCY = 100; // 同时未完成的请求上限
const REDIS_HOST = 'REMOTE_HOST';
const REDIS_PORT = 6379;
const REDIS_PASS = 'REDIS_PASS';
// ---------- 初始化 ----------
Coroutine\run(function () {
// 1. 连接远程 Redis
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
if (REDIS_PASS !== '') $redis->auth(REDIS_PASS);
// 2. 一条 HTTP/2 长连接(复用)
$host = 'target.com'; // 你的统一域名
$cli = new \Swoole\Coroutine\Http2\Client($host, 443, true);
if (!$cli->connect()) {
exit("HTTP/2 connect failed\n");
}
// 3. 并发信号量
$sem = new Channel(CONCURRENCY);
// 4. 主循环:BRPOP → 发送
go(function () use ($redis, $cli, $sem, $host) {
while (true) {
$data = $redis->brPop(['recharge_verification'], 5);
if (!$data) continue; // 空或超时
[$queue, $raw] = $data;
if (strpos($raw, '_') === false) continue;
[$uniq] = explode('_', $raw, 2);
$sem->push(true); // 占并发位
go(function () use ($cli, $uniq, $sem) {
$req = new Request();
$req->method = 'POST';
$req->path = '/api'; // 统一接口
$req->data = ['id' => $uniq];
$streamId = $cli->send($req);
// 等待这一 stream 的响应
$resp = $cli->recv($streamId, 5);
if ($resp && $resp->statusCode === 200) {
file_put_contents(
__DIR__ . '/response.log',
date('Y-m-d H:i:s') . " | {$uniq} | {$resp->data}\n",
FILE_APPEND | LOCK_EX
);
}
$sem->pop(); // 释放并发位
});
}
});
});