PHP 使用Swoole批量异步请求 异步获取结果的方法
刚子 发布于 阅读:109
这个脚本会:
检查同目录下是否存在 url.txt 文件。
如果文件不存在,暂停一秒后重新读取。
如果文件存在,读取所有订单号,删除文件,并拼接到指定 URL 后提交异步 GET 请求。
并发控制为 100 个请求。
所有请求提交后,主协程立即返回,不等待所有请求完成。
异步获取请求结果,并将结果追加写入日志文件,内容包含自定义请求 ID 和返回原始内容。
完成后重新触发循环,重新读取 url.txt。
实测请求1000个URL,9秒后全部URL请求完毕,14秒后全部结果返回完毕。
<?php
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Channel;
use Swoole\Runtime;
// 启用协程支持
Runtime::enableCoroutine(true);
// 配置常量
const CONFIG = [
'timeout' => 10, // 请求超时(秒)
'log_file' => __DIR__ . '/request.log',
'base_url' => 'https://www.baidu.com/2.php?id=',
'concurrency' => 100, // 最大并发数
'loop_delay' => 1000000, // 文件不存在时等待时间(微秒,1秒)
'file_path' => __DIR__ . '/url.txt'
];
/**
* 处理订单号文件并发起请求
*/
function processOrderFile()
{
$filePath = CONFIG['file_path'];
// 检查文件是否存在
if (!file_exists($filePath)) {
echo "[" . date('Y-m-d H:i:s') . "] 文件不存在,等待1秒...\n";
usleep(CONFIG['loop_delay']);
return;
}
// 原子化读取并删除文件(避免并发写入冲突)
$orders = [];
$fp = fopen($filePath, 'r');
if ($fp && flock($fp, LOCK_EX | LOCK_NB)) { // 非阻塞独占锁
// 读取所有订单号
while (($line = fgets($fp)) !== false) {
$orderNo = trim($line);
if (!empty($orderNo)) {
$orders[] = $orderNo;
}
}
flock($fp, LOCK_UN); // 释放锁
fclose($fp);
unlink($filePath); // 读取完成后删除文件
} else {
echo "[" . date('Y-m-d H:i:s') . "] 文件被占用,跳过本轮处理\n";
usleep(100000); // 短暂等待避免频繁尝试
return;
}
if (empty($orders)) {
echo "[" . date('Y-m-d H:i:s') . "] 文件为空,继续循环\n";
return;
}
$orderCount = count($orders);
echo "[" . date('Y-m-d H:i:s') . "] 读取到 {$orderCount} 个订单,开始处理\n";
// 初始化并发控制和结果收集通道
$semaphore = new Channel(CONFIG['concurrency']); // 并发信号量
$resultChan = new Channel($orderCount); // 结果收集通道
$completed = 0; // 完成计数
// 发起所有请求(非阻塞)
foreach ($orders as $orderNo) {
// 信号量控制并发
$semaphore->push(true);
// 创建请求协程
go(function () use ($orderNo, $semaphore, $resultChan, &$completed, $orderCount) {
$result = [
'id' => $orderNo,
'body' => '',
'error' => ''
];
try {
// 构建请求URL
$url = CONFIG['base_url'] . urlencode($orderNo);
$parsed = parse_url($url);
if (empty($parsed['host'])) {
throw new Exception("无效的URL: {$url}");
}
// 发起HTTP请求
$client = new Client($parsed['host'], 443, true);
$client->set([
'timeout' => CONFIG['timeout'] * 1000,
'connect_timeout' => 3 * 1000
]);
$path = $parsed['path'] ?? '/';
if (!empty($parsed['query'])) {
$path .= '?' . $parsed['query'];
}
$success = $client->get($path);
$result['body'] = $client->body ?? '';
if (!$success) {
$result['error'] = "请求失败 [{$client->errCode}]: {$client->errMsg}";
}
$client->close();
} catch (Exception $e) {
$result['error'] = "异常: {$e->getMessage()}";
} finally {
// 推送结果并释放信号量
$resultChan->push($result);
$semaphore->pop();
// 所有请求完成后关闭结果通道
if (++$completed >= $orderCount) {
$resultChan->close();
}
}
});
}
// 异步处理结果日志(不阻塞主循环)
go(function () use ($resultChan) {
while (($res = $resultChan->pop()) !== false) {
$log = "[" . date('Y-m-d H:i:s') . "]\n";
$log .= "请求ID: {$res['id']}\n";
$log .= empty($res['error']) ? "状态: 成功\n" : "状态: 失败 - {$res['error']}\n";
$log .= "响应内容: {$res['body']}\n";
$log .= "----------------------------------------\n\n";
// 追加写入日志(使用FILE_APPEND确保原子性)
file_put_contents(CONFIG['log_file'], $log, FILE_APPEND | LOCK_EX);
}
echo "[" . date('Y-m-d H:i:s') . "] 本轮结果处理完成\n";
});
// 无需等待结果,直接返回继续循环
echo "[" . date('Y-m-d H:i:s') . "] 所有请求已提交,继续下一轮检查\n";
}
// 主协程:启动死循环
go(function () {
echo "[" . date('Y-m-d H:i:s') . "] 程序启动,开始监控文件...\n";
while (true) {
processOrderFile();
}
});
// 启动事件循环(CLI环境下Swoole需要)
Swoole\Event::wait();