From 8b7025b3dbfa65243f742c192b90c6a0666aa99b Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 24 Dec 2024 18:03:25 +0200 Subject: [PATCH] OPIK-610 removed unnecessary interface --- .../opik/domain/ChatCompletionService.java | 11 ++-- .../DefaultLlmProviderStreamHandler.java | 52 ------------------- .../LlmProviderStreamHandler.java | 48 +++++++++++++++-- 3 files changed, 49 insertions(+), 62 deletions(-) delete mode 100644 apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/DefaultLlmProviderStreamHandler.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ChatCompletionService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ChatCompletionService.java index e5b0328dc5..a8b5fd33fb 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ChatCompletionService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ChatCompletionService.java @@ -1,7 +1,7 @@ package com.comet.opik.domain; -import com.comet.opik.domain.llmproviders.DefaultLlmProviderStreamHandler; import com.comet.opik.domain.llmproviders.LlmProviderFactory; +import com.comet.opik.domain.llmproviders.LlmProviderStreamHandler; import dev.ai4j.openai4j.chat.ChatCompletionRequest; import dev.ai4j.openai4j.chat.ChatCompletionResponse; import jakarta.inject.Inject; @@ -14,13 +14,12 @@ @Slf4j public class ChatCompletionService { private final LlmProviderFactory llmProviderFactory; - private final DefaultLlmProviderStreamHandler defaultStreamHandler; + private final LlmProviderStreamHandler streamHandler; @Inject - public ChatCompletionService(LlmProviderFactory llmProviderFactory, - DefaultLlmProviderStreamHandler defaultStreamHandler) { + public ChatCompletionService(LlmProviderFactory llmProviderFactory, LlmProviderStreamHandler streamHandler) { this.llmProviderFactory = llmProviderFactory; - this.defaultStreamHandler = defaultStreamHandler; + this.streamHandler = streamHandler; } public ChatCompletionResponse create(@NonNull ChatCompletionRequest request, @NonNull String workspaceId) { @@ -35,7 +34,7 @@ public ChunkedOutput createAndStreamResponse( @NonNull ChatCompletionRequest request, @NonNull String workspaceId) { log.info("Creating and streaming chat completions, workspaceId '{}', model '{}'", workspaceId, request.model()); var llmProviderClient = llmProviderFactory.getService(workspaceId, request.model()); - var chunkedOutput = llmProviderClient.generateStream(request, workspaceId, defaultStreamHandler); + var chunkedOutput = llmProviderClient.generateStream(request, workspaceId, streamHandler); log.info("Created and streaming chat completions, workspaceId '{}', model '{}'", workspaceId, request.model()); return chunkedOutput; } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/DefaultLlmProviderStreamHandler.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/DefaultLlmProviderStreamHandler.java deleted file mode 100644 index 26accb1d2d..0000000000 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/DefaultLlmProviderStreamHandler.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.comet.opik.domain.llmproviders; - -import com.comet.opik.utils.JsonUtils; -import dev.ai4j.openai4j.OpenAiHttpException; -import io.dropwizard.jersey.errors.ErrorMessage; -import lombok.extern.slf4j.Slf4j; -import org.glassfish.jersey.server.ChunkedOutput; - -import java.io.IOException; -import java.io.UncheckedIOException; - -@Slf4j -public class DefaultLlmProviderStreamHandler implements LlmProviderStreamHandler { - private static final String UNEXPECTED_ERROR_CALLING_LLM_PROVIDER = "Unexpected error calling LLM provider"; - - @Override - public void handleMessage(Object item, ChunkedOutput chunkedOutput) { - if (chunkedOutput.isClosed()) { - log.warn("Output stream is already closed"); - return; - } - try { - chunkedOutput.write(JsonUtils.writeValueAsString(item)); - } catch (IOException ioException) { - throw new UncheckedIOException(ioException); - } - } - - @Override - public void handleClose(ChunkedOutput chunkedOutput) { - try { - chunkedOutput.close(); - } catch (IOException ioException) { - log.error("Failed to close output stream", ioException); - } - } - - @Override - public void handleError(Throwable throwable, ChunkedOutput chunkedOutput) { - log.error(UNEXPECTED_ERROR_CALLING_LLM_PROVIDER, throwable); - var errorMessage = new ErrorMessage(UNEXPECTED_ERROR_CALLING_LLM_PROVIDER); - if (throwable instanceof OpenAiHttpException openAiHttpException) { - errorMessage = new ErrorMessage(openAiHttpException.code(), openAiHttpException.getMessage()); - } - try { - handleMessage(errorMessage, chunkedOutput); - } catch (UncheckedIOException uncheckedIOException) { - log.error("Failed to stream error message to client", uncheckedIOException); - } - handleClose(chunkedOutput); - } -} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/LlmProviderStreamHandler.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/LlmProviderStreamHandler.java index 9001e2065d..a5bbe6d6a2 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/LlmProviderStreamHandler.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/llmproviders/LlmProviderStreamHandler.java @@ -1,9 +1,49 @@ package com.comet.opik.domain.llmproviders; +import com.comet.opik.utils.JsonUtils; +import dev.ai4j.openai4j.OpenAiHttpException; +import io.dropwizard.jersey.errors.ErrorMessage; +import lombok.extern.slf4j.Slf4j; import org.glassfish.jersey.server.ChunkedOutput; -public interface LlmProviderStreamHandler { - void handleMessage(Object item, ChunkedOutput chunkedOutput); - void handleClose(ChunkedOutput chunkedOutput); - void handleError(Throwable throwable, ChunkedOutput chunkedOutput); +import java.io.IOException; +import java.io.UncheckedIOException; + +@Slf4j +public class LlmProviderStreamHandler { + private static final String UNEXPECTED_ERROR_CALLING_LLM_PROVIDER = "Unexpected error calling LLM provider"; + + public void handleMessage(Object item, ChunkedOutput chunkedOutput) { + if (chunkedOutput.isClosed()) { + log.warn("Output stream is already closed"); + return; + } + try { + chunkedOutput.write(JsonUtils.writeValueAsString(item)); + } catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } + } + + public void handleClose(ChunkedOutput chunkedOutput) { + try { + chunkedOutput.close(); + } catch (IOException ioException) { + log.error("Failed to close output stream", ioException); + } + } + + public void handleError(Throwable throwable, ChunkedOutput chunkedOutput) { + log.error(UNEXPECTED_ERROR_CALLING_LLM_PROVIDER, throwable); + var errorMessage = new ErrorMessage(UNEXPECTED_ERROR_CALLING_LLM_PROVIDER); + if (throwable instanceof OpenAiHttpException openAiHttpException) { + errorMessage = new ErrorMessage(openAiHttpException.code(), openAiHttpException.getMessage()); + } + try { + handleMessage(errorMessage, chunkedOutput); + } catch (UncheckedIOException uncheckedIOException) { + log.error("Failed to stream error message to client", uncheckedIOException); + } + handleClose(chunkedOutput); + } }