From 501a8b405557728e63cfebbc2b62f2d8899c1caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A9ri=20Le=20Bouder?= Date: Wed, 19 Jun 2024 18:47:44 -0400 Subject: [PATCH] api/view: improve the collect of the Schema1 events Harmonize how we collect we prepare the Schema1 events and how we do the payload validation and the exception handling. --- ansible_ai_connect/ai/api/serializers.py | 8 +- .../ai/api/telemetry/schema1.py | 291 +++++++++++ .../ai/api/telemetry/test_schema1.py | 127 +++++ .../ai/api/utils/analytics_telemetry_model.py | 2 + ansible_ai_connect/ai/api/utils/segment.py | 8 + ansible_ai_connect/ai/api/views.py | 470 ++++++------------ ansible_ai_connect/main/middleware.py | 85 +--- ansible_ai_connect/main/settings/base.py | 3 +- 8 files changed, 590 insertions(+), 404 deletions(-) create mode 100644 ansible_ai_connect/ai/api/telemetry/schema1.py create mode 100644 ansible_ai_connect/ai/api/telemetry/test_schema1.py diff --git a/ansible_ai_connect/ai/api/serializers.py b/ansible_ai_connect/ai/api/serializers.py index 0a2382bac..8f0ea75a5 100644 --- a/ansible_ai_connect/ai/api/serializers.py +++ b/ansible_ai_connect/ai/api/serializers.py @@ -151,9 +151,11 @@ def validate(self, data): data = super().validate(data) data["prompt"], data["context"] = fmtr.extract_prompt_and_context(data["prompt"]) - CompletionRequestSerializer.validate_extracted_prompt( - data["prompt"], self.context.get("request").user - ) + + # TODO + # CompletionRequestSerializer.validate_extracted_prompt( + # data["prompt"], self.context.get("request").user + # ) # If suggestion ID was not included in the request, set a random UUID to it. if data.get("suggestionId") is None: diff --git a/ansible_ai_connect/ai/api/telemetry/schema1.py b/ansible_ai_connect/ai/api/telemetry/schema1.py new file mode 100644 index 000000000..5a43be626 --- /dev/null +++ b/ansible_ai_connect/ai/api/telemetry/schema1.py @@ -0,0 +1,291 @@ +# Copyright Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import platform +import uuid + +from attr import Factory, asdict, field +from attrs import define, validators +from django.apps import apps +from django.utils import timezone +from rest_framework.exceptions import ErrorDetail + +import ansible_ai_connect.ai.api.telemetry.schema1 as schema1 +from ansible_ai_connect.ai.api.aws.exceptions import WcaSecretManagerError +from ansible_ai_connect.ai.api.model_client.exceptions import ( + WcaModelIdNotFound, + WcaNoDefaultModelId, +) +from ansible_ai_connect.ai.api.serializers import ( + InlineSuggestionFeedback, + SuggestionQualityFeedback, +) +from ansible_ai_connect.healthcheck.version_info import VersionInfo +from ansible_ai_connect.users.models import User + +logger = logging.getLogger(__name__) +version_info = VersionInfo() + + +@define +class ResponsePayload: + exception: str = field(validator=validators.instance_of(str), converter=str, default="") + error_type: str = field(validator=validators.instance_of(str), converter=str, default="") + message: str = field(validator=validators.instance_of(str), converter=str, default="") + status_code: int = field(validator=validators.instance_of(int), converter=int, default=0) + status_text: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class Schema1Event: + event_name: str = "noName" + imageTags: str = field( + validator=validators.instance_of(str), converter=str, default=version_info.image_tags + ) + hostname: str = field( + validator=validators.instance_of(str), converter=str, default=platform.node() + ) + groups: list[str] = Factory(list) + + rh_user_has_seat: bool = False + rh_user_org_id: int | None = None + timestamp = timezone.now().isoformat() + modelName: str = field(validator=validators.instance_of(str), converter=str, default="") + exception: bool = False + response: ResponsePayload = ResponsePayload() + user: User | None = None + + def set_user(self, user): + self.user = user + self.rh_user_has_seat = user.rh_user_has_seat + self.rh_user_org_id = user.org_id + self.groups = list(user.groups.values_list("name", flat=True)) + + def set_exception(self, exception): + if exception: + self.exception = True + self.response.exception = str(exception) + + def set_response(self, response): + self.response.error_type = getattr(response, "error_type", None) + if response.status_code >= 400 and getattr(response, "content", None): + self.response.message = str(response.content) + self.response.status_code = response.status_code + self.response.status_text = (getattr(response, "status_text", None),) + + def set_validated_data(self, validated_data): + for field_name, value in validated_data.items(): + if hasattr(self, field_name): + setattr(self, field_name, value) + + # TODO: improve the way we define the model in the payload. + try: + model_mesh_client = apps.get_app_config("ai").model_mesh_client + self.modelName = model_mesh_client.get_model_id( + self.rh_user_org_id, str(validated_data.get("model", "")) + ) + except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): + logger.debug( + f"Failed to retrieve Model Name for Feedback.\n " + f"Org ID: {self.rh_user_org_id}, " + f"User has seat: {self.rh_user_has_seat}, " + f"has subscription: {self.user.rh_org_has_subscription}.\n" + ) + + @classmethod + def init(cls, user, validated_data): + print("init()") + schema1_event = cls() + schema1_event.set_user(user) + schema1_event.set_validated_data(validated_data) + return schema1_event + + def as_dict(self): + # NOTE: The allowed fields should be moved in the event class itslef + def my_filter(a, v): + return a.name not in ["event_name", "user"] + + return asdict(self, filter=my_filter) + + +@define +class CompletionRequestPayload: + context: str = field(validator=validators.instance_of(str), converter=str, default="") + prompt: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class CompletionEvent(Schema1Event): + event_name: str = "completion" + suggestionId: str = field( + validator=validators.instance_of(str), converter=str, default=uuid.uuid4() + ) + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + promptType: str = "" + taskCount: int = 0 + metadata = {} + request: CompletionRequestPayload = CompletionRequestPayload() + + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.request.context = validated_data.get("context") + self.request.prompt = validated_data.get("prompt") + + def get_tasks_from_response(self, response_data): + if not response_data: + return [] + predictions = response_data.get("predictions", []) + if not predictions: + return [] + return predictions + + def set_response(self, response): + super().set_response(response) + response_data = getattr(response, "data") + tasks = self.get_tasks_from_response(response_data) + if isinstance(response_data, dict): + message = response_data.get("message") + if isinstance(message, ErrorDetail): + message = str(message) + if response.status_code >= 400: + response_data.pop("model", None) + + self.taskCount = len(tasks) + + +@define +class ExplainPlaybookEvent(Schema1Event): + event_name: str = "explainPlaybook" + explanationId: str = field(validator=validators.instance_of(str), converter=str, default="") + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + + +@define +class CodegenPlaybookEvent(Schema1Event): + event_name: str = "codegenPlaybook" + generationId: str = field(validator=validators.instance_of(str), converter=str, default="") + wizardId: str = field(validator=validators.instance_of(str), converter=str, default="") + playbook_length: int | None = None + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.playbook_length = len(validated_data.get("playbook", "")) + + +@define +class ContentMatchEvent(Schema1Event): + event_name: str = "codematch" + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + request: dict | None = None + response: None = None + metadata: list | None = None # TODO + + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.request_data = validated_data + + +# Events associated with the Feedback view +@define +class BaseFeedbackEvent(Schema1Event): + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + # event are wrapped in the FeedbackRequestSerializer class + suggestion_quality_data: SuggestionQualityFeedback = validated_data[self.event_name] + super().set_validated_data(suggestion_quality_data) + + @classmethod + def init(cls, user, validated_data): + mapping = { + "inlineSuggestion": schema1.InlineSuggestionFeedbackEvent, + "inlineSuggestionFeedback": schema1.InlineSuggestionFeedbackEvent, + "suggestionQualityFeedback": schema1.SuggestionQualityFeedbackEvent, + "sentimentFeedback": schema1.InlineSuggestionFeedbackEvent, + "issueFeedback": schema1.IssueFeedbackEvent, + "playbookExplanationFeedback": schema1.PlaybookExplanationFeedbackEvent, + "playbookGenerationAction": schema1.PlaybookGenerationActionEvent, + } + # TODO: handles the key that are at the root level of the structure + for key_name, schema1_class in mapping.items(): + if key_name in validated_data: + schema1_event = schema1_class() + schema1_event.set_user(user) + schema1_event.set_validated_data(validated_data) + return schema1_event + print("Failed to init BaseFeedbackEvent") + + +@define +class InlineSuggestionFeedbackEvent(BaseFeedbackEvent): + event_name: str = "inlineSuggestionFeedback" + latency: float = field(validator=validators.instance_of(float), converter=float, default=0.0) + userActionTime: int = field(validator=validators.instance_of(int), converter=int, default=0) + action: int = field(validator=validators.instance_of(int), converter=int, default=0) + suggestionId: str = field(validator=validators.instance_of(str), converter=str, default="") + activityId: str = field(validator=validators.instance_of(str), converter=str, default="") + + # Remove the method one year after https://github.com/ansible/vscode-ansible/pull/1408 is merged + # and released + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + inlineSuggestion: InlineSuggestionFeedback = validated_data.get( + self.event_name + ) or validated_data.get("inlineSuggestion") + super().set_validated_data(inlineSuggestion) + + +@define +class SuggestionQualityFeedbackEvent(BaseFeedbackEvent): + event_name: str = "suggestionQualityFeedback" + prompt: str = field(validator=validators.instance_of(str), converter=str, default="") + providedSuggestion: str = field( + validator=validators.instance_of(str), converter=str, default="" + ) + expectedSuggestion: str = field( + validator=validators.instance_of(str), converter=str, default="" + ) + additionalComment: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class SentimentFeedbackEvent(BaseFeedbackEvent): + event_name: str = "sentimentFeedback" + value: int = field(validator=validators.instance_of(int), converter=int, default=0) + feedback: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class IssueFeedbackEvent(BaseFeedbackEvent): + event_name: str = "issueFeedback" + type: str = field(validator=validators.instance_of(str), converter=str, default="") + title: str = field(validator=validators.instance_of(str), converter=str, default="") + description: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class PlaybookExplanationFeedbackEvent(BaseFeedbackEvent): + event_name: str = "playbookExplanationFeedback" + action: int = field(validator=validators.instance_of(int), converter=int, default=0) + explanation_id: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class PlaybookGenerationActionEvent(BaseFeedbackEvent): + event_name: str = "playbookGenerationAction" + action: int = field(validator=validators.instance_of(int), converter=int, default=0) + from_page: int = field(validator=validators.instance_of(int), converter=int, default=0) + to_page: int = field(validator=validators.instance_of(int), converter=int, default=0) + wizard_id: str = field(validator=validators.instance_of(str), converter=str, default="") diff --git a/ansible_ai_connect/ai/api/telemetry/test_schema1.py b/ansible_ai_connect/ai/api/telemetry/test_schema1.py new file mode 100644 index 000000000..2edf7d4be --- /dev/null +++ b/ansible_ai_connect/ai/api/telemetry/test_schema1.py @@ -0,0 +1,127 @@ +# Copyright Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase, mock + +from .schema1 import ( + InlineSuggestionFeedbackEvent, + IssueFeedbackEvent, + PlaybookExplanationFeedbackEvent, + PlaybookGenerationActionEvent, + Schema1Event, + SentimentFeedbackEvent, + SuggestionQualityFeedbackEvent, +) + + +class TestSchema1Event(TestCase): + def test_set_user(self): + m_user = mock.Mock() + m_user.rh_user_has_seat = True + m_user.org_id = 123 + m_user.groups.values_list.return_value = ["mecano"] + event1 = Schema1Event() + event1.set_user(m_user) + self.assertEqual(event1.rh_user_has_seat, True) + self.assertEqual(event1.rh_user_org_id, 123) + self.assertEqual(event1.groups, ["mecano"]) + + def test_as_dict(self): + event1 = Schema1Event() + as_dict = event1.as_dict() + + self.assertEqual(as_dict.get("event_name"), None) + self.assertFalse(as_dict.get("exception"), False) + + def test_set_exception(self): + event1 = Schema1Event() + try: + 1 / 0 + except Exception as e: + event1.set_exception(e) + self.assertTrue(event1.exception) + self.assertEqual(event1.response.exception, "division by zero") + + +class TestInlineSuggestionFeedbackEvent(TestCase): + def test_validated_data(self): + validated_data = { + "inlineSuggestion": { + "latency": 1.1, + "userActionTime": 1, + "action": "123", + "suggestionId": "1e0e1404-5b8a-4d06-829a-dca0d2fff0b5", + } + } + event1 = InlineSuggestionFeedbackEvent(validated_data) + self.assertEqual(event1.action, 0) + event1.set_validated_data(validated_data) + self.assertEqual(event1.action, 123) + + +class TestSuggestionQualityFeedbackEvent(TestCase): + def test_validated_data(self): + validated_data = { + "suggestionQualityFeedback": {"prompt": "Yo!", "providedSuggestion": "bateau"} + } + event1 = SuggestionQualityFeedbackEvent() + event1.set_validated_data(validated_data) + self.assertEqual(event1.providedSuggestion, "bateau") + + +class TestSentimentFeedbackEvent(TestCase): + def test_validated_data(self): + validated_data = {"sentimentFeedback": {"value": "1", "feedback": "C'est beau"}} + event1 = SentimentFeedbackEvent() + event1.set_validated_data(validated_data) + self.assertEqual(event1.value, 1) + + +class TestIssueFeedbackEvent(TestCase): + def test_validated_data(self): + validated_data = { + "issueFeedback": {"type": "1", "title": "C'est beau", "description": "Et oui!"} + } + event1 = IssueFeedbackEvent() + event1.set_validated_data(validated_data) + self.assertEqual(event1.title, "C'est beau") + + +class TestPlaybookExplanationFeedbackEvent(TestCase): + def test_validated_data(self): + validated_data = { + "playbookExplanationFeedback": { + "action": "1", + "explanation": "1ddda23c-5f8c-4015-b915-4951b8039ffa", + } + } + event1 = PlaybookExplanationFeedbackEvent() + event1.set_validated_data(validated_data) + self.assertEqual(event1.action, 1) + + +class TestPlaybookGenerationActionEvent(TestCase): + def test_validated_data(self): + validated_data = { + "playbookGenerationAction": { + "action": "2", + "from_page": 1, + "to_page": "2", + "wizard_id": "1ddda23c-5f8c-4015-b915-4951b8039ffa", + } + } + event1 = PlaybookGenerationActionEvent() + event1.set_validated_data(validated_data) + self.assertEqual(event1.action, 2) + self.assertEqual(event1.to_page, 2) diff --git a/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py b/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py index f863329b2..da43e6c4d 100644 --- a/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py +++ b/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Schema2 + from enum import Enum from attr import Factory, field, frozen diff --git a/ansible_ai_connect/ai/api/utils/segment.py b/ansible_ai_connect/ai/api/utils/segment.py index 6cda9161d..d626c8e75 100644 --- a/ansible_ai_connect/ai/api/utils/segment.py +++ b/ansible_ai_connect/ai/api/utils/segment.py @@ -123,6 +123,14 @@ def base_send_segment_event( send_segment_event(event, "segmentError", user) +def send_schema1_event(event_obj) -> None: + print(f"SENDING SCHEMA1 EVENT\n{event_obj}") + if not settings.SEGMENT_WRITE_KEY: + logger.info("segment write key not set, skipping event") + return + base_send_segment_event(event_obj.as_dict(), event_obj.event_name, event_obj.user, analytics) + + def redact_seated_users_data(event: Dict[str, Any], allow_list: Dict[str, Any]) -> Dict[str, Any]: """ Copy a dictionary to another dictionary using a nested list of allowed keys. diff --git a/ansible_ai_connect/ai/api/views.py b/ansible_ai_connect/ai/api/views.py index ccced8bee..56b7ca577 100644 --- a/ansible_ai_connect/ai/api/views.py +++ b/ansible_ai_connect/ai/api/views.py @@ -25,13 +25,12 @@ from prometheus_client import Histogram from rest_framework import permissions, serializers from rest_framework import status as rest_framework_status -from rest_framework.generics import GenericAPIView from rest_framework.response import Response from rest_framework.views import APIView +import ansible_ai_connect.ai.api.telemetry.schema1 as schema1 from ansible_ai_connect.ai.api.aws.exceptions import WcaSecretManagerError from ansible_ai_connect.ai.api.exceptions import ( - BaseWisdomAPIException, FeedbackInternalServerException, FeedbackValidationException, InternalServerError, @@ -83,11 +82,8 @@ GenerationRequestSerializer, GenerationResponseSerializer, InlineSuggestionFeedback, - IssueFeedback, - PlaybookExplanationFeedback, PlaybookGenerationAction, SentimentFeedback, - SuggestionQualityFeedback, ) from .utils.analytics_telemetry_model import ( AnalyticsPlaybookGenerationWizard, @@ -95,7 +91,7 @@ AnalyticsRecommendationAction, AnalyticsTelemetryEvents, ) -from .utils.segment import send_segment_event +from .utils.segment import send_schema1_event from .utils.segment_analytics_telemetry import send_segment_analytics_event logger = logging.getLogger(__name__) @@ -134,11 +130,71 @@ } -class Completions(APIView): +class OurAPIView(APIView): + exception = None + + def initial(self, request, *args, **kwargs): + super().initial(request, *args, **kwargs) + request_serializer = self.serializer_class(data=request.data) + request_serializer.is_valid(raise_exception=True) + print(f"VALIDATED_DATA={request_serializer.validated_data}") + self.validated_data = request_serializer.validated_data + print(f"-> self.schema1_event_class={self.schema1_event_class}") + if self.schema1_event_class: + self.schema1_event = self.schema1_event_class.init(request.user, self.validated_data) + print(f"init schema1_event with schema1_event_class.init() -> {self.schema1_event}") + + def _get_model_name(self, org_id: str) -> str: + try: + model_mesh_client = apps.get_app_config("ai").model_mesh_client + return model_mesh_client.get_model_id(org_id, self.validated_data.get("model", "")) + except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): + return "" + + def handle_exception(self, exc): + print(f"handle_exception! {exc}") + if not self.exception: + self.exception = exc + return super().handle_exception(exc) + + def dispatch(self, request, *args, **kwargs): + print(f"BEFORE: request={request}") + start_time = time.time() + self.exception = False + self.schema1_event = None + response = None + + try: + response = super().dispatch(request, *args, **kwargs) + except Exception as exc: + self.exception = exc + logger.exception(f"An exception {exc.__class__} occurred during a {request.path} call") + raise + finally: + duration = round((time.time() - start_time) * 1000, 2) + print( + f"AFTER: request={request} response={response} self.schema1_event={self.schema1_event}" + ) + + if self.schema1_event: + if hasattr(self.schema1_event, "duration"): + self.schema1_event.duration = duration + self.schema1_event.modelName = self._get_model_name(request.user.org_id) + self.schema1_event.set_exception(self.exception) + self.schema1_event.set_response(response) + send_schema1_event(self.schema1_event) + + return response + + +class Completions(OurAPIView): """ Returns inline code suggestions based on a given Ansible editor context. """ + serializer_class = CompletionRequestSerializer + schema1_event_class = schema1.CompletionEvent + permission_classes = PERMISSIONS_MAP.get(settings.DEPLOYMENT_MODE) required_scopes = ["read", "write"] @@ -162,11 +218,14 @@ def post(self, request) -> Response: return pipeline.execute() -class Feedback(APIView): +class Feedback(OurAPIView): """ Feedback API for the AI service """ + serializer_class = FeedbackRequestSerializer + schema1_event_class = schema1.BaseFeedbackEvent + permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -190,25 +249,28 @@ def post(self, request) -> Response: exception = None validated_data = {} try: - request_serializer = FeedbackRequestSerializer( - data=request.data, context={"request": request} - ) - - request_serializer.is_valid(raise_exception=True) - validated_data = request_serializer.validated_data - logger.info(f"feedback request payload from client: {validated_data}") + logger.info(f"feedback request payload from client: {self.validated_data}") return Response({"message": "Success"}, status=rest_framework_status.HTTP_200_OK) except serializers.ValidationError as exc: - exception = exc + self.exception = exc raise FeedbackValidationException(str(exc)) except Exception as exc: - exception = exc + self.exception = exc logger.exception(f"An exception {exc.__class__} occurred in sending a feedback") raise FeedbackInternalServerException() finally: - self.write_to_segment(request.user, validated_data, exception, request.data) + print("Sending stuff") + self.send_schema1() + self.send_schema2(request.user, validated_data, exception, request.data) - def write_to_segment( + def send_schema1( + self, + ): + if self.schema1_event: + self.schema1_event.set_exception(self.exception) + send_schema1_event(self.schema1_event) + + def send_schema2( self, user: User, validated_data: dict, @@ -216,14 +278,7 @@ def write_to_segment( request_data=None, ) -> None: inline_suggestion_data: InlineSuggestionFeedback = validated_data.get("inlineSuggestion") - suggestion_quality_data: SuggestionQualityFeedback = validated_data.get( - "suggestionQualityFeedback" - ) sentiment_feedback_data: SentimentFeedback = validated_data.get("sentimentFeedback") - issue_feedback_data: IssueFeedback = validated_data.get("issueFeedback") - playbook_explanation_feedback_data: PlaybookExplanationFeedback = validated_data.get( - "playbookExplanationFeedback" - ) playbook_generation_action_data: PlaybookGenerationAction = validated_data.get( "playbookGenerationAction" ) @@ -247,16 +302,6 @@ def write_to_segment( ) if inline_suggestion_data: - event = { - "latency": inline_suggestion_data.get("latency"), - "userActionTime": inline_suggestion_data.get("userActionTime"), - "action": inline_suggestion_data.get("action"), - "suggestionId": str(inline_suggestion_data.get("suggestionId", "")), - "modelName": model_name, - "activityId": str(inline_suggestion_data.get("activityId", "")), - "exception": exception is not None, - } - send_segment_event(event, "inlineSuggestionFeedback", user) send_segment_analytics_event( AnalyticsTelemetryEvents.RECOMMENDATION_ACTION, lambda: AnalyticsRecommendationAction( @@ -267,24 +312,7 @@ def write_to_segment( user, ansible_extension_version, ) - if suggestion_quality_data: - event = { - "prompt": suggestion_quality_data.get("prompt"), - "providedSuggestion": suggestion_quality_data.get("providedSuggestion"), - "expectedSuggestion": suggestion_quality_data.get("expectedSuggestion"), - "additionalComment": suggestion_quality_data.get("additionalComment"), - "modelName": model_name, - "exception": exception is not None, - } - send_segment_event(event, "suggestionQualityFeedback", user) if sentiment_feedback_data: - event = { - "value": sentiment_feedback_data.get("value"), - "feedback": sentiment_feedback_data.get("feedback"), - "modelName": model_name, - "exception": exception is not None, - } - send_segment_event(event, "sentimentFeedback", user) send_segment_analytics_event( AnalyticsTelemetryEvents.PRODUCT_FEEDBACK, lambda: AnalyticsProductFeedback( @@ -295,40 +323,16 @@ def write_to_segment( user, ansible_extension_version, ) - if issue_feedback_data: - event = { - "type": issue_feedback_data.get("type"), - "title": issue_feedback_data.get("title"), - "description": issue_feedback_data.get("description"), - "modelName": model_name, - "exception": exception is not None, - } - send_segment_event(event, "issueFeedback", user) - if playbook_explanation_feedback_data: - event = { - "action": playbook_explanation_feedback_data.get("action"), - "explanation_id": str(playbook_explanation_feedback_data.get("explanationId", "")), - "modelName": model_name, - } - send_segment_event(event, "playbookExplanationFeedback", user) if playbook_generation_action_data: - action = int(playbook_generation_action_data.get("action")) - from_page = playbook_generation_action_data.get("fromPage", 0) - to_page = playbook_generation_action_data.get("toPage", 0) - wizard_id = str(playbook_generation_action_data.get("wizardId", "")) - event = { - "action": action, - "wizardId": wizard_id, - "fromPage": from_page, - "toPage": to_page, - "modelName": model_name, - } - send_segment_event(event, "playbookGenerationAction", user) - if False and from_page > 1 and action in [1, 3]: + if ( + False + and playbook_generation_action_data["from_page"] > 1 + and playbook_generation_action_data["action"] in [1, 3] + ): send_segment_analytics_event( AnalyticsTelemetryEvents.PLAYBOOK_GENERATION_ACTION, lambda: AnalyticsPlaybookGenerationWizard( - action=action, + action=playbook_generation_action_data["action"], model_name=model_name, rh_user_org_id=org_id, wizard_id=str(playbook_generation_action_data.get("wizardId", "")), @@ -337,40 +341,14 @@ def write_to_segment( ansible_extension_version, ) - feedback_events = [ - inline_suggestion_data, - suggestion_quality_data, - sentiment_feedback_data, - issue_feedback_data, - ] - if exception and all(not data for data in feedback_events): - # When an exception is thrown before inline_suggestion_data or ansible_content_data - # is set, we send request_data to Segment after having anonymized it. - ano_request_data = anonymizer.anonymize_struct(request_data) - if "inlineSuggestion" in request_data: - event_type = "inlineSuggestionFeedback" - elif "suggestionQualityFeedback" in request_data: - event_type = "suggestionQualityFeedback" - elif "sentimentFeedback" in request_data: - event_type = "sentimentFeedback" - elif "issueFeedback" in request_data: - event_type = "issueFeedback" - else: - event_type = "unknown" - - event = { - "data": ano_request_data, - "exception": str(exception), - } - send_segment_event(event, event_type, user) - - -class ContentMatches(GenericAPIView): + +class ContentMatches(OurAPIView): """ Returns content matches that were the highest likelihood sources for a given code suggestion. """ serializer_class = ContentMatchRequestSerializer + schema1_event_class = schema1.ContentMatchEvent permission_classes = ( [ @@ -439,10 +417,6 @@ def perform_content_matching( f"input to content matches for suggestion id {suggestion_id}:\n{content_match_data}" ) - exception = None - event = None - event_name = None - start_time = time.time() response_serializer = None metadata = [] @@ -459,6 +433,11 @@ def perform_content_matching( contentmatch_encoding_hist.observe(content_match_dto.encode_duration / 1000) contentmatch_search_hist.observe(content_match_dto.search_duration / 1000) + # TODO: See if we can isolate the lines + self.schema1_event.request = request_data + self.schema1_event.response = response_data + self.schema1_event.metadata = metadata + try: response_serializer = ContentMatchResponseSerializer(data=response_data) response_serializer.is_valid(raise_exception=True) @@ -470,7 +449,7 @@ def perform_content_matching( raise InternalServerError except ModelTimeoutError as e: - exception = e + self.exception = e logger.warn( f"model timed out after {settings.ANSIBLE_AI_MODEL_MESH_API_TIMEOUT} seconds" f" for suggestion {suggestion_id}" @@ -478,19 +457,19 @@ def perform_content_matching( raise ModelTimeoutException(cause=e) except WcaBadRequest as e: - exception = e + self.exception = e logger.exception(f"bad request for content matching suggestion {suggestion_id}") raise WcaBadRequestException(cause=e) except WcaInvalidModelId as e: - exception = e + self.exception = e logger.exception( f"WCA Model ID is invalid for content matching suggestion {suggestion_id}" ) raise WcaInvalidModelIdException(cause=e) except WcaKeyNotFound as e: - exception = e + self.exception = e logger.exception( f"A WCA Api Key was expected but not found for " f"content matching suggestion {suggestion_id}" @@ -498,7 +477,7 @@ def perform_content_matching( raise WcaKeyNotFoundException(cause=e) except WcaModelIdNotFound as e: - exception = e + self.exception = e logger.exception( f"A WCA Model ID was expected but not found for " f"content matching suggestion {suggestion_id}" @@ -506,7 +485,7 @@ def perform_content_matching( raise WcaModelIdNotFoundException(cause=e) except WcaNoDefaultModelId as e: - exception = e + self.exception = e logger.exception( "A default WCA Model ID was expected but not found for " f"content matching suggestion {suggestion_id}" @@ -514,7 +493,7 @@ def perform_content_matching( raise WcaNoDefaultModelIdException(cause=e) except WcaSuggestionIdCorrelationFailure as e: - exception = e + self.exception = e logger.exception( f"WCA Request/Response SuggestionId correlation failed " f"for suggestion {suggestion_id}" @@ -522,89 +501,38 @@ def perform_content_matching( raise WcaSuggestionIdCorrelationFailureException(cause=e) except WcaEmptyResponse as e: - exception = e + self.exception = e logger.exception( f"WCA returned an empty response for content matching suggestion {suggestion_id}" ) raise WcaEmptyResponseException(cause=e) except WcaCloudflareRejection as e: - exception = e + self.exception = e logger.exception(f"Cloudflare rejected the request for {suggestion_id}") raise WcaCloudflareRejectionException(cause=e) except WcaUserTrialExpired as e: - exception = e + # NOTE: exception should be removed + self.exception = e logger.exception(f"User trial expired, when requesting suggestion {suggestion_id}") - event_name = "trialExpired" - event = { - "type": "contentmatch", - "modelName": model_id, - "suggestionId": str(suggestion_id), - } raise WcaUserTrialExpiredException(cause=e) except Exception as e: - exception = e + self.exception = e logger.exception(f"Error requesting content matches for suggestion {suggestion_id}") - raise ServiceUnavailable(cause=e) - - finally: - duration = round((time.time() - start_time) * 1000, 2) - if exception: - model_id_in_exception = BaseWisdomAPIException.get_model_id_from_exception( - exception - ) - if model_id_in_exception: - model_id = model_id_in_exception - if event: - event["modelName"] = model_id - send_segment_event(event, event_name, user) - else: - self.write_to_segment( - request_data, - duration, - exception, - metadata, - model_id, - response_serializer.data if response_serializer else {}, - suggestion_id, - user, - ) + raise return response_serializer - def write_to_segment( - self, - request_data, - duration, - exception, - metadata, - model_id, - response_data, - suggestion_id, - user, - ): - event = { - "duration": duration, - "exception": exception is not None, - "modelName": model_id, - "problem": None if exception is None else exception.__class__.__name__, - "request": request_data, - "response": response_data, - "suggestionId": str(suggestion_id), - "rh_user_has_seat": user.rh_user_has_seat, - "rh_user_org_id": user.org_id, - "metadata": metadata, - } - send_segment_event(event, "contentmatch", user) - -class Explanation(APIView): +class Explanation(OurAPIView): """ Returns a text that explains a playbook. """ + serializer_class = ExplanationRequestSerializer + schema1_event_class = schema1.ExplainPlaybookEvent permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -630,77 +558,31 @@ class Explanation(APIView): summary="Inline code suggestions", ) def post(self, request) -> Response: - duration = None - exception = None - explanation_id = None - playbook = "" - answer = {} - request_serializer = ExplanationRequestSerializer(data=request.data) - try: - request_serializer.is_valid(raise_exception=True) - explanation_id = str(request_serializer.validated_data.get("explanationId", "")) - playbook = request_serializer.validated_data.get("content") + explanation_id = str(self.validated_data.get("explanationId", "")) + playbook = self.validated_data.get("content") - llm = apps.get_app_config("ai").model_mesh_client - start_time = time.time() - explanation = llm.explain_playbook(request, playbook) - duration = round((time.time() - start_time) * 1000, 2) + llm = apps.get_app_config("ai").model_mesh_client + explanation = llm.explain_playbook(request, playbook) - # Anonymize response - # Anonymized in the View to be consistent with where Completions are anonymized - anonymized_explanation = anonymizer.anonymize_struct( - explanation, value_template=Template("{{ _${variable_name}_ }}") - ) + # Anonymize response + # Anonymized in the View to be consistent with where Completions are anonymized + anonymized_explanation = anonymizer.anonymize_struct( + explanation, value_template=Template("{{ _${variable_name}_ }}") + ) - answer = { - "content": anonymized_explanation, - "format": "markdown", - "explanationId": explanation_id, - } - except Exception as exc: - exception = exc - logger.exception(f"An exception {exc.__class__} occurred during a playbook generation") - raise - finally: - self.write_to_segment( - request.user, - explanation_id, - exception, - duration, - playbook_length=len(playbook), - ) + answer = { + "content": anonymized_explanation, + "format": "markdown", + "explanationId": explanation_id, + } return Response( answer, status=rest_framework_status.HTTP_200_OK, ) - def write_to_segment(self, user, explanation_id, exception, duration, playbook_length): - model_name = "" - try: - model_mesh_client = apps.get_app_config("ai").model_mesh_client - model_name = model_mesh_client.get_model_id(user.org_id, "") - except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): - pass - event = { - "duration": duration, - "exception": exception is not None, - "explanationId": explanation_id, - "modelName": model_name, - "playbook_length": playbook_length, - "rh_user_org_id": user.org_id, - } - if exception: - event["response"] = ( - { - "exception": str(exception), - }, - ) - send_segment_event(event, "explainPlaybook", user) - - -class Generation(APIView): +class Generation(OurAPIView): """ Returns a playbook based on a text input. """ @@ -708,6 +590,8 @@ class Generation(APIView): from oauth2_provider.contrib.rest_framework import IsAuthenticatedOrTokenHasScope from rest_framework import permissions + serializer_class = GenerationRequestSerializer + schema1_event_class = schema1.CodegenPlaybookEvent permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -733,85 +617,31 @@ class Generation(APIView): summary="Inline code suggestions", ) def post(self, request) -> Response: - exception = None - generation_id = None - wizard_id = None - duration = None - create_outline = None - anonymized_playbook = "" - playbook = "" - request_serializer = GenerationRequestSerializer(data=request.data) - answer = {} - try: - request_serializer.is_valid(raise_exception=True) - generation_id = str(request_serializer.validated_data.get("generationId", "")) - create_outline = request_serializer.validated_data["createOutline"] - outline = str(request_serializer.validated_data.get("outline", "")) - text = request_serializer.validated_data["text"] - wizard_id = str(request_serializer.validated_data.get("wizardId", "")) - - llm = apps.get_app_config("ai").model_mesh_client - start_time = time.time() - playbook, outline = llm.generate_playbook(request, text, create_outline, outline) - duration = round((time.time() - start_time) * 1000, 2) - - # Anonymize responses - # Anonymized in the View to be consistent with where Completions are anonymized - anonymized_playbook = anonymizer.anonymize_struct( - playbook, value_template=Template("{{ _${variable_name}_ }}") - ) - anonymized_outline = anonymizer.anonymize_struct( - outline, value_template=Template("{{ _${variable_name}_ }}") - ) + generation_id = str(self.validated_data.get("generationId", "")) + create_outline = self.validated_data["createOutline"] + outline = str(self.validated_data.get("outline", "")) + text = self.validated_data["text"] + + llm = apps.get_app_config("ai").model_mesh_client + playbook, outline = llm.generate_playbook(request, text, create_outline, outline) + + # Anonymize responses + # Anonymized in the View to be consistent with where Completions are anonymized + anonymized_playbook = anonymizer.anonymize_struct( + playbook, value_template=Template("{{ _${variable_name}_ }}") + ) + anonymized_outline = anonymizer.anonymize_struct( + outline, value_template=Template("{{ _${variable_name}_ }}") + ) - answer = { - "playbook": anonymized_playbook, - "outline": anonymized_outline, - "format": "plaintext", - "generationId": generation_id, - } - except Exception as exc: - exception = exc - logger.exception(f"An exception {exc.__class__} occurred during a playbook generation") - raise - finally: - self.write_to_segment( - request.user, - generation_id, - wizard_id, - exception, - duration, - create_outline, - playbook_length=len(anonymized_playbook), - ) + answer = { + "playbook": anonymized_playbook, + "outline": anonymized_outline, + "format": "plaintext", + "generationId": generation_id, + } return Response( answer, status=rest_framework_status.HTTP_200_OK, ) - - def write_to_segment( - self, user, generation_id, wizard_id, exception, duration, create_outline, playbook_length - ): - model_name = "" - try: - model_mesh_client = apps.get_app_config("ai").model_mesh_client - model_name = model_mesh_client.get_model_id(user.org_id, "") - except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): - pass - event = { - "create_outline": create_outline, - "duration": duration, - "exception": exception is not None, - "generationId": generation_id, - "modelName": model_name, - "playbook_length": playbook_length, - "wizardId": wizard_id, - } - if exception: - event["response"] = ( - { - "exception": str(exception), - }, - ) - send_segment_event(event, "codegenPlaybook", user) diff --git a/ansible_ai_connect/main/middleware.py b/ansible_ai_connect/main/middleware.py index 3a57f7f59..944ba775f 100644 --- a/ansible_ai_connect/main/middleware.py +++ b/ansible_ai_connect/main/middleware.py @@ -12,10 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time -import uuid from ansible_anonymizer import anonymizer from django.conf import settings @@ -33,7 +31,7 @@ ) from ansible_ai_connect.ai.api.utils.segment import send_segment_event from ansible_ai_connect.ai.api.utils.segment_analytics_telemetry import ( - send_segment_analytics_event, + send_segment_analytics_event, # Schema2 ) from ansible_ai_connect.healthcheck.version_info import VersionInfo @@ -64,8 +62,7 @@ def __init__(self, get_response): self.get_response = get_response def __call__(self, request): - start_time = time.time() - + # Schema2 if settings.SEGMENT_ANALYTICS_WRITE_KEY: if not segment_analytics_telemetry.write_key: segment_analytics_telemetry.write_key = settings.SEGMENT_ANALYTICS_WRITE_KEY @@ -74,86 +71,14 @@ def __call__(self, request): # segment_analytics_telemetry.send = False # for code development only segment_analytics_telemetry.on_error = on_segment_analytics_error - if settings.SEGMENT_WRITE_KEY: - if not analytics.write_key: - analytics.write_key = settings.SEGMENT_WRITE_KEY - analytics.debug = settings.DEBUG - analytics.gzip = True # Enable gzip compression - # analytics.send = False # for code development only - analytics.on_error = on_segment_error - - if request.path == reverse("completions") and request.method == "POST": - if request.content_type == "application/json": - try: - request_data = ( - json.loads(request.body.decode("utf-8")) if request.body else {} - ) - request_data = anonymize_request_data(request_data) - except Exception: # when an invalid json or an invalid encoding is detected - request_data = {} - else: - request_data = anonymize_request_data(request.POST) - response = self.get_response(request) - if settings.SEGMENT_WRITE_KEY: + if settings.SEGMENT_ANALYTICS_WRITE_KEY: if request.path == reverse("completions") and request.method == "POST": - request_suggestion_id = getattr( - request, "_suggestion_id", request_data.get("suggestionId") - ) - if not request_suggestion_id: - request_suggestion_id = str(uuid.uuid4()) - context = request_data.get("context") - prompt = request_data.get("prompt") - model_name = request_data.get("model", "") - metadata = request_data.get("metadata", {}) - promptType = getattr(request, "_prompt_type", None) - - predictions = None - message = None - response_data = getattr(response, "data", {}) - - if isinstance(response_data, dict): - predictions = response_data.get("predictions") - message = response_data.get("message") - if isinstance(message, ErrorDetail): - message = str(message) - model_name = response_data.get("model", model_name) - # For other error cases, remove 'model' in response data - if response.status_code >= 400: - response_data.pop("model", None) - elif response.status_code >= 400 and getattr(response, "content", None): - message = str(response.content) - - duration = round((time.time() - start_time) * 1000, 2) tasks = getattr(response, "tasks", []) - event = { - "duration": duration, - "request": {"context": context, "prompt": prompt}, - "response": { - "exception": getattr(response, "exception", None), - # See main.exception_handler.exception_handler_with_error_type - # That extracts 'default_code' from Exceptions and stores it - # in the Response. - "error_type": getattr(response, "error_type", None), - "message": message, - "predictions": predictions, - "status_code": response.status_code, - "status_text": getattr(response, "status_text", None), - }, - "suggestionId": request_suggestion_id, - "metadata": metadata, - "modelName": model_name, - "imageTags": version_info.image_tags, - "tasks": tasks, - "promptType": promptType, - "taskCount": len(tasks), - } - - send_segment_event(event, "completion", request.user) - # Collect analytics telemetry, when tasks exist. if len(tasks) > 0: + # Schema2 send_segment_analytics_event( AnalyticsTelemetryEvents.RECOMMENDATION_GENERATED, lambda: AnalyticsRecommendationGenerated( @@ -165,7 +90,7 @@ def __call__(self, request): for task in tasks ], rh_user_org_id=getattr(request.user, "org_id", None), - suggestion_id=request_suggestion_id, + suggestion_id=getattr(response, "suggestionId", ""), model_name=model_name, ), request.user, diff --git a/ansible_ai_connect/main/settings/base.py b/ansible_ai_connect/main/settings/base.py index d515b7387..b50693455 100644 --- a/ansible_ai_connect/main/settings/base.py +++ b/ansible_ai_connect/main/settings/base.py @@ -233,8 +233,9 @@ # Wisdom Eng Team: # gh api -H "Accept: application/vnd.github+json" /orgs/ansible/teams/wisdom-contrib -# Write key for sending analytics data to Segment. Note that each of Prod/Dev have a different key. +# Write key for sending Schema1 analytics data to Segment. Note that each of Prod/Dev have a different key. SEGMENT_WRITE_KEY = os.environ.get("SEGMENT_WRITE_KEY") +# Schema2 telemetry SEGMENT_ANALYTICS_WRITE_KEY = os.environ.get("SEGMENT_ANALYTICS_WRITE_KEY") ANALYTICS_MIN_ANSIBLE_EXTENSION_VERSION = os.environ.get( "ANALYTICS_MIN_ANSIBLE_EXTENSION_VERSION", "v2.12.143"