Enable per-conversation loading states to allow having parallel conversations (#16327)
* feat: Per-conversation loading states and tracking streaming stats * chore: update webui build output * refactor: Chat state management Consolidates loading state management by using a global `isLoading` store synchronized with individual conversation states. This change ensures proper reactivity and avoids potential race conditions when updating the UI based on the loading status of different conversations. It also improves the accuracy of statistics displayed. Additionally, slots service methods are updated to use conversation IDs for per-conversation state management, avoiding global state pollution. * feat: Adds loading indicator to conversation items * chore: update webui build output * fix: Fix aborting chat streaming Improves the chat stream abortion process by ensuring that partial responses are saved before the abort signal is sent. This avoids a race condition where the onError callback could clear the streaming state before the partial response is saved. Additionally, the stream reading loop and callbacks are now checked for abort signals to prevent further processing after abortion. * refactor: Remove redundant comments * chore: build webui static output * refactor: Cleanup * chore: update webui build output * chore: update webui build output * fix: Conversation loading indicator for regenerating messages * chore: update webui static build * feat: Improve configuration * feat: Install `http-server` as dev dependency to not need to rely on `npx` in CI
This commit is contained in:
committed by
GitHub
parent
06332e2867
commit
13f2cfad41
@@ -29,7 +29,7 @@ import { slotsService } from './slots';
|
||||
* - Request lifecycle management (abort, cleanup)
|
||||
*/
|
||||
export class ChatService {
|
||||
private abortController: AbortController | null = null;
|
||||
private abortControllers: Map<string, AbortController> = new Map();
|
||||
|
||||
/**
|
||||
* Sends a chat completion request to the llama.cpp server.
|
||||
@@ -43,7 +43,8 @@ export class ChatService {
|
||||
*/
|
||||
async sendMessage(
|
||||
messages: ApiChatMessageData[] | (DatabaseMessage & { extra?: DatabaseMessageExtra[] })[],
|
||||
options: SettingsChatServiceOptions = {}
|
||||
options: SettingsChatServiceOptions = {},
|
||||
conversationId?: string
|
||||
): Promise<string | void> {
|
||||
const {
|
||||
stream,
|
||||
@@ -79,25 +80,25 @@ export class ChatService {
|
||||
|
||||
const currentConfig = config();
|
||||
|
||||
// Cancel any ongoing request and create a new abort controller
|
||||
this.abort();
|
||||
this.abortController = new AbortController();
|
||||
const requestId = conversationId || 'default';
|
||||
|
||||
if (this.abortControllers.has(requestId)) {
|
||||
this.abortControllers.get(requestId)?.abort();
|
||||
}
|
||||
|
||||
const abortController = new AbortController();
|
||||
this.abortControllers.set(requestId, abortController);
|
||||
|
||||
// Convert database messages with attachments to API format if needed
|
||||
const normalizedMessages: ApiChatMessageData[] = messages
|
||||
.map((msg) => {
|
||||
// Check if this is a DatabaseMessage by checking for DatabaseMessage-specific fields
|
||||
if ('id' in msg && 'convId' in msg && 'timestamp' in msg) {
|
||||
// This is a DatabaseMessage, convert it
|
||||
const dbMsg = msg as DatabaseMessage & { extra?: DatabaseMessageExtra[] };
|
||||
return ChatService.convertMessageToChatServiceData(dbMsg);
|
||||
} else {
|
||||
// This is already an ApiChatMessageData object
|
||||
return msg as ApiChatMessageData;
|
||||
}
|
||||
})
|
||||
.filter((msg) => {
|
||||
// Filter out empty system messages
|
||||
if (msg.role === 'system') {
|
||||
const content = typeof msg.content === 'string' ? msg.content : '';
|
||||
|
||||
@@ -107,7 +108,6 @@ export class ChatService {
|
||||
return true;
|
||||
});
|
||||
|
||||
// Build base request body with system message injection
|
||||
const processedMessages = this.injectSystemMessage(normalizedMessages);
|
||||
|
||||
const requestBody: ApiChatCompletionRequest = {
|
||||
@@ -172,11 +172,10 @@ export class ChatService {
|
||||
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {})
|
||||
},
|
||||
body: JSON.stringify(requestBody),
|
||||
signal: this.abortController.signal
|
||||
signal: abortController.signal
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
// Use the new parseErrorResponse method to handle structured errors
|
||||
const error = await this.parseErrorResponse(response);
|
||||
if (onError) {
|
||||
onError(error);
|
||||
@@ -185,13 +184,16 @@ export class ChatService {
|
||||
}
|
||||
|
||||
if (stream) {
|
||||
return this.handleStreamResponse(
|
||||
await this.handleStreamResponse(
|
||||
response,
|
||||
onChunk,
|
||||
onComplete,
|
||||
onError,
|
||||
options.onReasoningChunk
|
||||
options.onReasoningChunk,
|
||||
conversationId,
|
||||
abortController.signal
|
||||
);
|
||||
return;
|
||||
} else {
|
||||
return this.handleNonStreamResponse(response, onComplete, onError);
|
||||
}
|
||||
@@ -227,18 +229,19 @@ export class ChatService {
|
||||
onError(userFriendlyError);
|
||||
}
|
||||
throw userFriendlyError;
|
||||
} finally {
|
||||
this.abortControllers.delete(requestId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles streaming response from the chat completion API.
|
||||
* Processes server-sent events and extracts content chunks from the stream.
|
||||
*
|
||||
* @param response - The fetch Response object containing the streaming data
|
||||
* Handles streaming response from the chat completion API
|
||||
* @param response - The Response object from the fetch request
|
||||
* @param onChunk - Optional callback invoked for each content chunk received
|
||||
* @param onComplete - Optional callback invoked when the stream is complete with full response
|
||||
* @param onError - Optional callback invoked if an error occurs during streaming
|
||||
* @param onReasoningChunk - Optional callback invoked for each reasoning content chunk
|
||||
* @param conversationId - Optional conversation ID for per-conversation state tracking
|
||||
* @returns {Promise<void>} Promise that resolves when streaming is complete
|
||||
* @throws {Error} if the stream cannot be read or parsed
|
||||
*/
|
||||
@@ -251,7 +254,9 @@ export class ChatService {
|
||||
timings?: ChatMessageTimings
|
||||
) => void,
|
||||
onError?: (error: Error) => void,
|
||||
onReasoningChunk?: (chunk: string) => void
|
||||
onReasoningChunk?: (chunk: string) => void,
|
||||
conversationId?: string,
|
||||
abortSignal?: AbortSignal
|
||||
): Promise<void> {
|
||||
const reader = response.body?.getReader();
|
||||
|
||||
@@ -269,14 +274,20 @@ export class ChatService {
|
||||
try {
|
||||
let chunk = '';
|
||||
while (true) {
|
||||
if (abortSignal?.aborted) break;
|
||||
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
if (abortSignal?.aborted) break;
|
||||
|
||||
chunk += decoder.decode(value, { stream: true });
|
||||
const lines = chunk.split('\n');
|
||||
chunk = lines.pop() || ''; // Save incomplete line for next read
|
||||
chunk = lines.pop() || '';
|
||||
|
||||
for (const line of lines) {
|
||||
if (abortSignal?.aborted) break;
|
||||
|
||||
if (line.startsWith('data: ')) {
|
||||
const data = line.slice(6);
|
||||
if (data === '[DONE]') {
|
||||
@@ -293,9 +304,7 @@ export class ChatService {
|
||||
const promptProgress = parsed.prompt_progress;
|
||||
|
||||
if (timings || promptProgress) {
|
||||
this.updateProcessingState(timings, promptProgress);
|
||||
|
||||
// Store the latest timing data
|
||||
this.updateProcessingState(timings, promptProgress, conversationId);
|
||||
if (timings) {
|
||||
lastTimings = timings;
|
||||
}
|
||||
@@ -304,21 +313,29 @@ export class ChatService {
|
||||
if (content) {
|
||||
hasReceivedData = true;
|
||||
aggregatedContent += content;
|
||||
onChunk?.(content);
|
||||
if (!abortSignal?.aborted) {
|
||||
onChunk?.(content);
|
||||
}
|
||||
}
|
||||
|
||||
if (reasoningContent) {
|
||||
hasReceivedData = true;
|
||||
fullReasoningContent += reasoningContent;
|
||||
onReasoningChunk?.(reasoningContent);
|
||||
if (!abortSignal?.aborted) {
|
||||
onReasoningChunk?.(reasoningContent);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Error parsing JSON chunk:', e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (abortSignal?.aborted) break;
|
||||
}
|
||||
|
||||
if (abortSignal?.aborted) return;
|
||||
|
||||
if (streamFinished) {
|
||||
if (!hasReceivedData && aggregatedContent.length === 0) {
|
||||
const noResponseError = new Error('No response received from server. Please try again.');
|
||||
@@ -520,10 +537,18 @@ export class ChatService {
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
public abort(): void {
|
||||
if (this.abortController) {
|
||||
this.abortController.abort();
|
||||
this.abortController = null;
|
||||
public abort(conversationId?: string): void {
|
||||
if (conversationId) {
|
||||
const abortController = this.abortControllers.get(conversationId);
|
||||
if (abortController) {
|
||||
abortController.abort();
|
||||
this.abortControllers.delete(conversationId);
|
||||
}
|
||||
} else {
|
||||
for (const controller of this.abortControllers.values()) {
|
||||
controller.abort();
|
||||
}
|
||||
this.abortControllers.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -581,7 +606,6 @@ export class ChatService {
|
||||
|
||||
return error;
|
||||
} catch {
|
||||
// If we can't parse the error response, return a generic error
|
||||
const fallback = new Error(`Server error (${response.status}): ${response.statusText}`);
|
||||
fallback.name = 'HttpError';
|
||||
return fallback;
|
||||
@@ -590,23 +614,25 @@ export class ChatService {
|
||||
|
||||
private updateProcessingState(
|
||||
timings?: ChatMessageTimings,
|
||||
promptProgress?: ChatMessagePromptProgress
|
||||
promptProgress?: ChatMessagePromptProgress,
|
||||
conversationId?: string
|
||||
): void {
|
||||
// Calculate tokens per second from timing data
|
||||
const tokensPerSecond =
|
||||
timings?.predicted_ms && timings?.predicted_n
|
||||
? (timings.predicted_n / timings.predicted_ms) * 1000
|
||||
: 0;
|
||||
|
||||
// Update slots service with timing data (async but don't wait)
|
||||
slotsService
|
||||
.updateFromTimingData({
|
||||
prompt_n: timings?.prompt_n || 0,
|
||||
predicted_n: timings?.predicted_n || 0,
|
||||
predicted_per_second: tokensPerSecond,
|
||||
cache_n: timings?.cache_n || 0,
|
||||
prompt_progress: promptProgress
|
||||
})
|
||||
.updateFromTimingData(
|
||||
{
|
||||
prompt_n: timings?.prompt_n || 0,
|
||||
predicted_n: timings?.predicted_n || 0,
|
||||
predicted_per_second: tokensPerSecond,
|
||||
cache_n: timings?.cache_n || 0,
|
||||
prompt_progress: promptProgress
|
||||
},
|
||||
conversationId
|
||||
)
|
||||
.catch((error) => {
|
||||
console.warn('Failed to update processing state:', error);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user