调用
/**
* 流式请求 DeepSeek API get请求
* /v1/ai/stream
*/
public function stream()
{
$params = request()->input();
$validator = $this->validator::make($params, [
'messages' => 'required',
], [
'messages.required' => '消息的内容不能为空',
]);
if ($validator->fails()) {
return $this->error($validator->messages()->first());
}
if (empty($params['ai_model'])) {
$params['ai_model'] = 'deepseek-chat';
}
if (!in_array($params['ai_model'], ['deepseek-chat', 'deepseek-reasoner'])) return $this->error('ai模型错误');
$model = $params['ai_model']; // 可选参数,默认值 'deepseek-chat',deepseek-reasoner
//历史记录
$model_type = ($params['ai_model'] == 'deepseek-chat') ? 1 : 2;
$login_user_id = (int)Auth::id();
if (!$login_user_id) return $this->error('请登录后操作!', 401);
$messages_list = AiChat::select(['id', 'message', 'content'])
->where('user_id', $login_user_id)
->where('type', 1)
->where('model', $model_type)
->where('chat_id', $params['chat_id'])
->get();
$messages = [];
foreach ($messages_list as $value) {
$messages[] = ['role' => 'user', 'content' => $value['message']];
$messages[] = ['role' => 'assistant', 'content' => $value['content']];
}
// 获取前端传来的消息
$messages[] = ['role' => 'user', 'content' => $params['messages']];
// 创建流式响应
$buffer = ''; // 提前定义
$response = new StreamedResponse(function () use (&$buffer, $messages, $model, $params) {
DeepSeekService::getInstance()->stream(
$messages,
function ($chunk) use (&$buffer) {
$buffer .= $chunk;
echo $chunk;
ob_flush();
flush();
},
$model,
function () use (&$buffer, &$params) {
$insert_data = [
'chat_id' => $params['chat_id'],
'message' => $params['messages'],
'user_id' => (int)(Auth::id()),
'type' => 1,
'model' => ($params['ai_model'] == 'deepseek-chat') ? 1 : 2,
'content' => $buffer,
'result' => 'stream',
];
AiChat::query()->insert($insert_data);
}
);
});
// 设置响应头
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no'); // 防止 NGINX 缓冲
return $response;
}
/**
* 流式请求 DeepSeek API post请求
* /v1/ai/stream
*/
public function streamV2()
{
$params_all = request()->input();
$params = '';
foreach ($params_all as $key => $value) {
$params = json_decode($key, true);
}
if (!$params) return $this->error('参数错误');
$validator = $this->validator::make($params, [
'messages' => 'required',
], [
'messages.required' => '消息的内容不能为空',
]);
if ($validator->fails()) {
return $this->error($validator->messages()->first());
}
// 获取前端传来的消息
$messages = [
['role' => 'user', 'content' => $params['messages']]
];
if (empty($params['ai_model'])) {
$params['ai_model'] = 'deepseek-chat';
}
if (!in_array($params['ai_model'], ['deepseek-chat', 'deepseek-reasoner'])) return $this->error('ai模型错误');
$model = $params['ai_model']; // 可选参数,默认值 'deepseek-chat',deepseek-reasoner
// 创建流式响应
$response = new StreamedResponse(function () use ($messages, $model) {
DeepSeekService::getInstance()->streamV2($messages, function ($chunk) {
echo $chunk;
flush(); // 刷新系统输出缓冲区
$chunk = trim($chunk);
if (($chunk == ': keep-alive') || ($chunk == 'data: [DONE]')) $text = '';
$chunk_data = explode("\n\n", $chunk);
foreach ($chunk_data as $key => $line) {
$text = '';
$line = trim($line);
if (str_starts_with($line, 'data: ')) {
$json = substr($line, 6);
if ($json === '[DONE]') break;
$payload = json_decode($json, true);
$text = '';
if (!empty($payload['choices'][0]['delta']['content'])) {
$text = $payload['choices'][0]['delta']['content'];
}
if (!empty($payload['choices'][0]['delta']['reasoning_content'])) {
$text = $payload['choices'][0]['delta']['reasoning_content'];
}
}
if (!$text) continue;
echo $text;
ob_flush(); // 刷新 PHP 输出缓冲区
flush(); // 刷新系统输出缓冲区
}
}, $model);
});
// 设置响应头
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no'); // 防止 NGINX 缓冲
return $response;
}
DeepSeekService.php
<?php
namespace App\Services\Ai;
use App\Services\BaseService;
class DeepSeekService extends BaseService
{
const TOKEN = 'sk-421301a27465';
private static $instance;
public static function getInstance()
{
if (!self::$instance instanceof self) {
self::$instance = new self();
}
return self::$instance;
}
/**
* @notes: 请求 DeepSeek API 获取 completions
* @param string $ai_content 用户输入内容
* @param string $model 使用的模型,默认为 'deepseek-reasoner'
* @return array 返回处理结果,包含 code, msg, data
* @author: OwenZhang
* @time: 2025/4/1 10:41
*/
public function completions($ai_content, $model = 'deepseek-chat')
{
$url = 'https://api.deepseek.com/chat/completions';
$authorizationToken = self::TOKEN;
// 构建请求数据
$data = [
'messages' => $ai_content,
'model' => $model
];
// 初始化 cURL
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_ENCODING => '',
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 0,
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_POSTFIELDS => json_encode($data), // 使用 json_encode 编码请求体
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Accept: application/json',
'Authorization: Bearer ' . $authorizationToken
],
]);
$response = curl_exec($curl);
if ($response === false) {
$error = curl_error($curl);
$errno = curl_errno($curl);
curl_close($curl);
return ['code' => 400, 'msg' => "cURL Error ($errno): $error", 'data' => []];
}
curl_close($curl);
return ['code' => 200, 'msg' => '', 'data' => $response];
}
/**
* 流式发送 AI 请求
* @param array $messages 消息数组
* @param callable|null $onData 每段数据到达时回调 function($chunk) {}
* @param string $model 使用的模型名称
* @param callable|null $onDone 所有数据发送完成后的回调 function() {}
*/
public function stream(array $messages, ?callable $onData = null, string $model = 'deepseek-chat', ?callable $onDone = null): void
{
if (empty($messages)) {
echo "消息不能为空。\n";
return;
}
$url = 'https://api.deepseek.com/chat/completions';
$postData = [
'model' => $model,
'messages' => $messages,
'stream' => true
];
$headers = [
'Content-Type: application/json',
'Accept: text/event-stream',
'Authorization: Bearer ' . self::TOKEN
];
$calledDone = false; // 防止多次调用 onDone
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_TCP_KEEPALIVE => 1,
CURLOPT_POST => true,
CURLOPT_HTTPHEADER => $headers,
CURLOPT_POSTFIELDS => json_encode($postData),
CURLOPT_TIMEOUT => 0,
CURLOPT_WRITEFUNCTION => function ($ch, $data) use ($onData, $onDone, &$calledDone) {
foreach (explode("\n\n", $data) as $line) {
$line = trim($line);
if (str_starts_with($line, 'data: ')) {
$json = substr($line, 6);
if ($json === '[DONE]') {
if ($onDone && !$calledDone) {
$calledDone = true;
call_user_func($onDone);
}
return strlen($data);
}
$payload = json_decode($json, true);
$text = '';
if (!empty($payload['choices'][0]['delta'])) {
$delta = $payload['choices'][0]['delta'];
if (empty($delta['content']) && empty($delta['reasoning_content'])) {
continue;
}
$text = $delta['content'] ?? $delta['reasoning_content'] ?? '';
}
if ($onData) {
$onData($text);
} else {
echo $text;
flush();
}
}
}
return strlen($data);
},
]);
curl_exec($ch);
if (curl_errno($ch)) {
echo "[错误] " . curl_error($ch) . "\n";
}
curl_close($ch);
// 再次兜底,防止 curl_exec 提前返回没触发 onDone
if ($onDone && !$calledDone) {
$calledDone = true;
call_user_func($onDone);
}
}
public function streamV2(array $messages, ?callable $onData = null, string $model = 'deepseek-chat'): void
{
$payload = json_encode([
'model' => $model,
'messages' => $messages,
'stream' => true,
]);
$headers = [
'Content-Type: application/json',
'Authorization: Bearer ' . self::TOKEN,
];
$url = 'https://api.deepseek.com/chat/completions';
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, false);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($ch, $data) use ($onData) {
if ($onData) {
call_user_func($onData, $data);
}
return strlen($data);
});
curl_setopt($ch, CURLOPT_TIMEOUT, 300);
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($ch, CURLOPT_HEADER, false);
// VERY IMPORTANT: enable streaming
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
curl_setopt($ch, CURLOPT_BUFFERSIZE, 128);
curl_setopt($ch, CURLOPT_TCP_NODELAY, true);
curl_exec($ch);
if (curl_errno($ch)) {
echo 'Curl error: ' . curl_error($ch);
}
curl_close($ch);
}
}