1.前端访问后端的AI消息接口,后端记录并且异步流式返回,发现时间很久
[DisplayName("添加消息")]
[HttpPost("AddMessage")]
public async Task AddMessage([FromBody] AIMessage message, bool isFirstMessage)
{
if (message.ConversationId > 0)
isFirstMessage = false;
message.UserId = _userManager.UserId;
long ConversationId = await _aimessageService.AddMessageAsync(message, isFirstMessage);
LiandanluExclusiveModelRequest args = ConvertToExclusiveModelRequest(message);
var response = _httpContextAccessor.HttpContext.Response;
response.ContentType = "text/event-stream";
response.Headers.Add("Cache-Control", "no-cache");
response.Headers.Add("Connection", "keep-alive");
var aiResponseContent = new StringBuilder();
try
{
using (var writer = new StreamWriter(response.Body))
{
await BaiLianTurboGenerate(message.MessageText, writer, aiResponseContent, ConversationId);
}
}
catch (Exception ex)
{
// 记录错误信息
//_logger.LogError(ex, "An error occurred while streaming the response.");
}
if (aiResponseContent.Length > 0)
{
var aiMessage = new AIMessage
{
ConversationId = message.ConversationId,
UserId = null,
SenderType = "AI",
MessageText = aiResponseContent.ToString()
};
await _aimessageService.AddMessageAsync(aiMessage, false);
}
}
[NonAction]
[HttpPost("blmemoLong")]
public async Task BaiLianLongGenerate(string Prompt, StreamWriter writer, StringBuilder aiResponseContent, long ConversationId)
{
var httpClient = _httpClientFactory.CreateClient();
var requestUrl = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions";
var requestBody = new
{
model = _aitokenOptions.BaiLianModel,
stream = true,
messages = new[]
{
new { role = "system", content = _aitokenOptions.BaiLianPrompt },
new { role = "user", content = Prompt }
}
};
var requestContent = new StringContent(JsonConvert.SerializeObject(requestBody), Encoding.UTF8, "application/json");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _aitokenOptions.BaiLianAPIKey);
var response = await httpClient.PostAsync(requestUrl, requestContent);
response.EnsureSuccessStatusCode();
var stream = await response.Content.ReadAsStreamAsync();
using (var reader = new StreamReader(stream, Encoding.UTF8, true, 8192))
{
string line;
while ((line = await reader.ReadLineAsync()) != null)
{
if (line.StartsWith("data:"))
{
line = line.Substring(5);
}
var lineData = JsonConvert.DeserializeObject<BaiLianLongResponse>(line);
if (lineData != null && lineData.Choices != null && lineData.Choices.Length > 0)
{
var choice = lineData.Choices[0];
if (choice.Delta != null && choice.Delta.Content != null)
{
var data = choice.Delta.Content;
var responseData = new
{
ConversationId = ConversationId,
data = data
};
await writer.WriteAsync($"data: {JsonConvert.SerializeObject(responseData)}\n\n");
await writer.FlushAsync();
Thread.Sleep(100);
aiResponseContent.Append(data);
}
if (choice.FinishReason == "stop")
{
break;
}
}
}
}
}
如上,本意是阿里云百炼的接口返回一条,我的接口响应一条,可是在实际的返回中发现,虽然也是流式返回,但是返回的时间很长,好像是等待所有的流式返回都返回之后,在进行的流式返回~~
2.打上断点发现
var response = await httpClient.PostAsync(requestUrl, requestContent);
这一步耗时很久
然后在查阅资料后发现,在httpClient处理SSE的时候,需要设置
HttpCompletionOption.ResponseHeadersRead
表示请求在读取完响应的所有内容后才会返回;
chatGPT给到的答复是
HttpCompletionOption.ResponseHeadersRead
HttpCompletionOption 是一个枚举类型,用于配置 HttpClient 在完成 HTTP 操作时的行为。该枚举有两个值:
HttpCompletionOption.ResponseContentRead(默认): 当调用 HttpClient.SendAsync 时,请求在读取完响应的所有内容后才会返回。所以异步请求将等待整个响应数据的读取完成。
HttpCompletionOption.ResponseHeadersRead: 当调用 HttpClient.SendAsync 时,请求在接收到响应的头部后立即返回,而不是等待整个响应内容的读取。这意味着在接收到响应头后,可以立即开始处理响应流,在某些流式数据的场景(如 SSE)中,这个选项非常有用。
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
使用 HttpCompletionOption.ResponseHeadersRead 的代码将更早开始处理响应,尤其是在响应主体内容非常大的时候,这能大大减少等待时间并提高效率。
还一个就是,在异步编程的时候,需要捕获同步的上下文,默认为true
.ConfigureAwait(false)
所以,在设置HttpCompletionOption.ResponseHeadersRead为false、和.ConfigureAwait(false)之后,响应速度和直接请求接口一样了
3.以下是修改后的代码
[NonAction]
[HttpPost("blmemoturbo")]
public async Task BaiLianTurboGenerate(string Prompt, StreamWriter writer, StringBuilder aiResponseContent, long ConversationId)
{
var httpClient = _httpClientFactory.CreateClient();
var requestUrl = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation";
var requestBody = new
{
model = "qwen-turbo",
input = new
{
messages = new[]
{
new { role = "system", content = _aitokenOptions.BaiLianPrompt },
new { role = "user", content = Prompt }
}
},
parameters = new
{
result_format = "message",
incremental_output = true
}
};
var requestContent = new StringContent(JsonConvert.SerializeObject(requestBody), Encoding.UTF8, "application/json");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _aitokenOptions.BaiLianAPIKey);
httpClient.DefaultRequestHeaders.Add("X-DashScope-SSE", "enable");
var response = await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Post, requestUrl) { Content = requestContent }, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
using (var reader = new StreamReader(stream, Encoding.UTF8, true, 8192))
{
string line;
while ((line = await reader.ReadLineAsync().ConfigureAwait(false)) != null)
{
if (line.StartsWith("data:"))
{
line = line.Substring(5).Trim();
if (!string.IsNullOrEmpty(line))
{
try
{
var lineData = JsonConvert.DeserializeObject<BaiLianTurboResponse>(line);
if (lineData?.Output?.Choices != null && lineData.Output.Choices.Length > 0)
{
var choice = lineData.Output.Choices[0];
if (choice.Message?.Content != null)
{
var data = choice.Message.Content;
var responseData = new
{
ConversationId = ConversationId,
data = data
};
await writer.WriteAsync($"data: {JsonConvert.SerializeObject(responseData)}\n\n");
await writer.FlushAsync();
aiResponseContent.Append(data);
}
if (choice.FinishReason == "stop")
{
break;
}
}
}
catch (JsonException ex)
{
// Add appropriate logging or error handling here
Console.WriteLine($"JSON parse exception: {ex}");
}
}
}
}
}
}
💬 评论