«

PHP多线程批量请求

刚子 发布于 阅读:21


我们需要做以下事情:

  1. 动态生成200个相同的URL(假设为'http://example.com')
  2. 使用cURL多线程并发请求,最大并发数控制在20
  3. 对每个请求返回的内容判断是否包含字符串'searchInput'
  4. 将结果追加写入到当前目录下的1.log文件中,每行格式为:日期时间 - 包含(或不包含) - 换行
    注意:为了控制最大并发数为20,我们将200个URL分成多个批次(每批最多20个),然后逐批处理。
    但是,在cURL多线程中,我们通常使用一个循环来添加多个句柄,然后执行。为了控制并发,我们可以使用一个固定大小的“窗口”,每次只添加20个句柄,然后等待这一批完成,再处理下一批。
    然而,更高效的方式是使用curl_multi的持续执行,并且通过维护一个活动句柄队列来动态添加,保持同时运行的请求数不超过20。
    这里我们采用分批处理的方式,因为更简单直观。
    步骤:
  5. 生成200个URL的数组(全部相同)
  6. 将数组分成每批20个
  7. 对每一批进行并发请求
  8. 对于每个请求,检查返回内容中是否包含'searchInput'
  9. 将结果写入日志文件(1.log)
    注意:由于200个URL完全一样,所以结果也会一样,这里只是演示。
    但是,我们也可以考虑使用动态添加的方式,即当有一个请求完成,我们就从剩余的URL中取一个添加进去。这样更高效,但代码稍复杂。这里我们先采用分批的方式。
    由于是示例,我们假设URL为'http://example.com',实际中请替换
    我们将使用以下步骤:
    • 初始化curl_multi
    • 分批处理:每次最多添加20个句柄
    • 等待该批全部完成
    • 处理结果(判断内容并写日志)
    • 清理该批句柄
    • 继续下一批
      代码实现如下:
      <?php
      // 动态生成200个相同URL
      $baseUrl = 'https://example.com/api'; // 替换为实际URL
      $urls = array_fill(0, 200, $baseUrl);

// 日志文件
$logFile = DIR . '/1.log';

// 配置参数
$maxConcurrent = 20; // 最大并发数
$totalUrls = count($urls);
$batches = ceil($totalUrls / $maxConcurrent);
$results = [];

// 初始化日志文件头
file_put_contents($logFile, "=== 请求日志 (" . date('Y-m-d H:i:s') . ") ===\n", FILE_APPEND);

for ($batch = 0; $batch < $batches; $batch++) {
// 获取当前批次URL
$batchUrls = array_slice($urls, $batch * $maxConcurrent, $maxConcurrent);

$mh = curl_multi_init();
$handles = [];

// 创建当前批次请求
foreach ($batchUrls as $i => $url) {
    $ch = curl_init();
    curl_setopt_array($ch, [
        CURLOPT_URL => $url,
        CURLOPT_RETURNTRANSFER => true,
        CURLOPT_TIMEOUT => 15,
        CURLOPT_CONNECTTIMEOUT => 5,
        CURLOPT_FOLLOWLOCATION => true,
        CURLOPT_SSL_VERIFYPEER => false,
        CURLOPT_NOSIGNAL => 1, // 避免超时信号问题
        CURLOPT_USERAGENT => 'ConcurrentCrawler/1.0'
    ]);
    curl_multi_add_handle($mh, $ch);
    $handles[$url . '#' . ($batch * $maxConcurrent + $i)] = $ch; // 唯一标识
}

// 执行并发请求
$running = null;
do {
    $status = curl_multi_exec($mh, $running);
    if ($running) {
        curl_multi_select($mh, 0.5); // 减少CPU占用
    }
} while ($running > 0 && $status === CURLM_OK);

// 处理结果
foreach ($handles as $key => $ch) {
    $originalIndex = explode('#', $key)[1];
    $content = curl_multi_getcontent($ch);
    $error = curl_error($ch);

    // 判断内容是否包含searchInput
    $contains = false;
    $status = '失败';

    if (!$error && $content !== false) {
        $status = '成功';
        $contains = (stripos($content, 'searchInput') !== false);
    }

    // 构建日志条目
    $logEntry = sprintf(
        "[%s] URL-%d %s - %s%s\n",
        date('Y-m-d H:i:s'),
        $originalIndex,
        $status,
        ($contains ? '包含' : '不包含'),
        $error ? " (错误: $error)" : ""
    );

    // 写入日志
    file_put_contents($logFile, $logEntry, FILE_APPEND);

    // 释放资源
    curl_multi_remove_handle($mh, $ch);
    curl_close($ch);
}

curl_multi_close($mh);

// 批处理完成提示
echo "已完成批次: " . ($batch + 1) . "/$batches\n";

// 释放内存
unset($handles, $batchUrls);

}

echo "所有请求处理完成! 日志已保存至: $logFile\n";
?>