From 4beada0a89fbf5f747d3dd48c511eeacb8688991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A9ri=20Le=20Bouder?= Date: Wed, 19 Jun 2024 18:47:44 -0400 Subject: [PATCH] api/view: improve the collect of the Schema1 events Harmonize how we collect we prepare the Schema1 events and how we do the payload validation and the exception handling. --- .../ai/api/telemetry/schema1.py | 122 ++++++++ .../ai/api/telemetry/test_schema1.py | 57 ++++ .../ai/api/utils/analytics_telemetry_model.py | 2 + ansible_ai_connect/ai/api/utils/segment.py | 8 + ansible_ai_connect/ai/api/views.py | 280 +++++++----------- ansible_ai_connect/main/middleware.py | 6 +- .../ai/api/telemetry/test_schema1.py | 2 + 7 files changed, 302 insertions(+), 175 deletions(-) create mode 100644 ansible_ai_connect/ai/api/telemetry/schema1.py create mode 100644 ansible_ai_connect/ai/api/telemetry/test_schema1.py create mode 100644 ansible_wisdom/ai/api/telemetry/test_schema1.py diff --git a/ansible_ai_connect/ai/api/telemetry/schema1.py b/ansible_ai_connect/ai/api/telemetry/schema1.py new file mode 100644 index 000000000..b55eddcb9 --- /dev/null +++ b/ansible_ai_connect/ai/api/telemetry/schema1.py @@ -0,0 +1,122 @@ +# Copyright Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import platform +from enum import Enum + +from attr import Factory, asdict, field, frozen +from attrs import define, fields, validators +from django.utils import timezone + +from ansible_ai_connect.healthcheck.version_info import VersionInfo +from ansible_ai_connect.users.models import User + +version_info = VersionInfo() + + +@define +class ResponsePayload: + exception: str = field(validator=validators.instance_of(str), converter=str, default="") + + +@define +class Schema1Event: + event_name: str = "noName" + imageTags: str = field( + validator=validators.instance_of(str), converter=str, default=version_info.image_tags + ) + hostname: str = field( + validator=validators.instance_of(str), converter=str, default=platform.node() + ) + groups: list[str] = Factory(list) + + rh_user_has_seat: bool = False + rh_user_org_id: int | None = None + timestamp = timezone.now().isoformat() + modelName: str = field(validator=validators.instance_of(str), converter=str, default="") + exception: bool = False + response: ResponsePayload | None = ResponsePayload() + user: User | None = None + + def set_user(self, user): + self.user = user + self.rh_user_has_seat = user.rh_user_has_seat + self.rh_user_org_id = user.org_id + self.groups = list(user.groups.values_list("name", flat=True)) + + def set_exception(self, exception): + if exception: + self.exception = True + self.response.exception = str(exception) + + def set_validated_data(self, validated_data): + for field, value in validated_data.items(): + if hasattr(self, field): + setattr(self, field, value) + + def as_dict(self): + # NOTE: The allowed fields should be moved in the event class itslef + def my_filter(a, v): + return not a.name in ["event_name", "user"] + + return asdict(self, filter=my_filter) + + +@define +class ExplainPlaybookSchema1Event(Schema1Event): + event_name: str = "explainPlaybook" + explanationId: str = field(validator=validators.instance_of(str), converter=str, default="") + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + + +@define +class CodegenPlaybookSchema1Event(Schema1Event): + event_name: str = "codegenPlaybook" + generationId: str = field(validator=validators.instance_of(str), converter=str, default="") + wizardId: str = field(validator=validators.instance_of(str), converter=str, default="") + playbook_length: int | None = None + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.playbook_length = len(validated_data.get("playbook", "")) + + +@define +class ContentMatchSchema1Event(Schema1Event): + event_name: str = "codematch" + request_data: dict | None = None + metadata: list | None = None # TODO + + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.request_data = validated_data + + +@define +class InlineSuggestionFeedbackSchema1Event(Schema1Event): + event_name: str = "inlineSuggestionFeedback" + latency: float = field(validator=validators.instance_of(float), converter=float, default=0.0) + userActionTime: int = field(validator=validators.instance_of(int), converter=int, default=0) + action: int = field(validator=validators.instance_of(int), converter=int, default=0) + suggestionId: str = field(validator=validators.instance_of(str), converter=str, default="") + activityId: str = field(validator=validators.instance_of(str), converter=str, default="") + + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.latency = validated_data["latency"] + self.userActionTime = validated_data["userActionTime"] + self.suggestionId = validated_data["suggestionId"] + self.activityId = validated_data.get("activityId") + self.action = validated_data["action"] diff --git a/ansible_ai_connect/ai/api/telemetry/test_schema1.py b/ansible_ai_connect/ai/api/telemetry/test_schema1.py new file mode 100644 index 000000000..8446d6010 --- /dev/null +++ b/ansible_ai_connect/ai/api/telemetry/test_schema1.py @@ -0,0 +1,57 @@ +# Copyright Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase, mock + +from .schema1 import Schema1Event + + +class TestSchema1Event(TestCase): + def test_set_user(self): + m_user = mock.Mock() + m_user.rh_user_has_seat = True + m_user.org_id = 123 + m_user.groups.values_list.return_value = ["mecano"] + validated_data = {} + event1 = Schema1Event() + event1.set_user(m_user) + self.assertEqual(event1.rh_user_has_seat, True) + self.assertEqual(event1.rh_user_org_id, 123) + self.assertEqual(event1.groups, ["mecano"]) + + def test_as_dict(self): + event1 = Schema1Event() + as_dict = event1.as_dict() + + self.assertEqual(as_dict.get("event_name"), None) + self.assertFalse(as_dict.get("exception"), False) + + def test_set_exception(self): + event1 = Schema1Event() + try: + 1 / 0 + except Exception as e: + event1.set_exception(e) + self.assertTrue(event1.exception) + self.assertEqual(event1.response.exception, "division by zero") + + +class TestInlineSuggestionFeedbackSchema1Event(TestCase): + def test_validated_data(self): + validated_data = { + "latency": 1.1, + "userActionTime": 1, + "action": 123, + "suggestionId": "1e0e1404-5b8a-4d06-829a-dca0d2fff0b5", + } diff --git a/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py b/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py index f863329b2..da43e6c4d 100644 --- a/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py +++ b/ansible_ai_connect/ai/api/utils/analytics_telemetry_model.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Schema2 + from enum import Enum from attr import Factory, field, frozen diff --git a/ansible_ai_connect/ai/api/utils/segment.py b/ansible_ai_connect/ai/api/utils/segment.py index 6cda9161d..f79422a05 100644 --- a/ansible_ai_connect/ai/api/utils/segment.py +++ b/ansible_ai_connect/ai/api/utils/segment.py @@ -123,6 +123,14 @@ def base_send_segment_event( send_segment_event(event, "segmentError", user) +def send_schema1_event(event_obj) -> None: + print(event_obj) + if not settings.SEGMENT_WRITE_KEY: + logger.info("segment write key not set, skipping event") + return + base_send_segment_event(event_obj.as_dict(), event_obj.event_name, event_obj.user, analytics) + + def redact_seated_users_data(event: Dict[str, Any], allow_list: Dict[str, Any]) -> Dict[str, Any]: """ Copy a dictionary to another dictionary using a nested list of allowed keys. diff --git a/ansible_ai_connect/ai/api/views.py b/ansible_ai_connect/ai/api/views.py index ccced8bee..4d9cb7840 100644 --- a/ansible_ai_connect/ai/api/views.py +++ b/ansible_ai_connect/ai/api/views.py @@ -60,6 +60,11 @@ WcaUserTrialExpired, ) from ansible_ai_connect.ai.api.pipelines.completions import CompletionsPipeline +from ansible_ai_connect.ai.api.telemetry.schema1 import ( + CodegenPlaybookSchema1Event, + ContentMatchSchema1Event, + ExplainPlaybookSchema1Event, +) from ansible_ai_connect.users.models import User from ..feature_flags import FeatureFlags @@ -95,7 +100,7 @@ AnalyticsRecommendationAction, AnalyticsTelemetryEvents, ) -from .utils.segment import send_segment_event +from .utils.segment import send_schema1_event, send_segment_event from .utils.segment_analytics_telemetry import send_segment_analytics_event logger = logging.getLogger(__name__) @@ -134,6 +139,53 @@ } +class OurAPIView(APIView): + exception = None + + def initial(self, request, *args, **kwargs): + super().initial(request, *args, **kwargs) + request_serializer = self.serializer_class(data=request.data) + request_serializer.is_valid(raise_exception=True) + self.validated_data = request_serializer.validated_data + + def _get_model_name(self, org_id: str) -> str: + try: + model_mesh_client = apps.get_app_config("ai").model_mesh_client + return model_mesh_client.get_model_id(org_id, self.validated_data.get("model", "")) + except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): + return "" + + def handle_exception(self, exc): + self.exception = exc + super().handle_exception(exc) + + def dispatch(self, request, *args, **kwargs): + print(f"BEFORE: request={request}") + start_time = time.time() + self.exception = False + response = None + try: + response = super().dispatch(request, *args, **kwargs) + except Exception as exc: + logger.exception(f"An exception {exc.__class__} occurred during a {request.path} call") + raise + finally: + duration = round((time.time() - start_time) * 1000, 2) + print(f"AFTER: request={response}") + + if self.schema1_event_class: + my_schema1_event = self.schema1_event_class() + my_schema1_event.duration = duration + my_schema1_event.modelName = self._get_model_name(request.user.org_id) + my_schema1_event.set_user(request.user) + my_schema1_event.set_exception(self.exception) + my_schema1_event.set_validated_data(self.validated_data) + print(f"Event dict: {my_schema1_event.as_dict()}") + send_schema1_event(my_schema1_event) + + return response + + class Completions(APIView): """ Returns inline code suggestions based on a given Ansible editor context. @@ -162,11 +214,14 @@ def post(self, request) -> Response: return pipeline.execute() -class Feedback(APIView): +class Feedback(OurAPIView): """ Feedback API for the AI service """ + serializer_class = FeedbackRequestSerializer + schema1_event_class = None + permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -247,16 +302,11 @@ def write_to_segment( ) if inline_suggestion_data: - event = { - "latency": inline_suggestion_data.get("latency"), - "userActionTime": inline_suggestion_data.get("userActionTime"), - "action": inline_suggestion_data.get("action"), - "suggestionId": str(inline_suggestion_data.get("suggestionId", "")), - "modelName": model_name, - "activityId": str(inline_suggestion_data.get("activityId", "")), - "exception": exception is not None, - } - send_segment_event(event, "inlineSuggestionFeedback", user) + s1event = InlineSuggestionFeedbackSchema1Event() + s1event.set_user(user) + s1event.set_validated_data(self.validated_data) + s1event.set_exception(exception) + send_schema1_event(s1event) send_segment_analytics_event( AnalyticsTelemetryEvents.RECOMMENDATION_ACTION, lambda: AnalyticsRecommendationAction( @@ -371,6 +421,7 @@ class ContentMatches(GenericAPIView): """ serializer_class = ContentMatchRequestSerializer + schema1_event_class = ContentMatchSchema1Event permission_classes = ( [ @@ -547,30 +598,7 @@ def perform_content_matching( except Exception as e: exception = e logger.exception(f"Error requesting content matches for suggestion {suggestion_id}") - raise ServiceUnavailable(cause=e) - - finally: - duration = round((time.time() - start_time) * 1000, 2) - if exception: - model_id_in_exception = BaseWisdomAPIException.get_model_id_from_exception( - exception - ) - if model_id_in_exception: - model_id = model_id_in_exception - if event: - event["modelName"] = model_id - send_segment_event(event, event_name, user) - else: - self.write_to_segment( - request_data, - duration, - exception, - metadata, - model_id, - response_serializer.data if response_serializer else {}, - suggestion_id, - user, - ) + raise return response_serializer @@ -600,11 +628,13 @@ def write_to_segment( send_segment_event(event, "contentmatch", user) -class Explanation(APIView): +class Explanation(OurAPIView): """ Returns a text that explains a playbook. """ + serializer_class = ExplanationRequestSerializer + schema1_event_class = ExplainPlaybookSchema1Event permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -630,77 +660,31 @@ class Explanation(APIView): summary="Inline code suggestions", ) def post(self, request) -> Response: - duration = None - exception = None - explanation_id = None - playbook = "" - answer = {} - request_serializer = ExplanationRequestSerializer(data=request.data) - try: - request_serializer.is_valid(raise_exception=True) - explanation_id = str(request_serializer.validated_data.get("explanationId", "")) - playbook = request_serializer.validated_data.get("content") + explanation_id = str(self.validated_data.get("explanationId", "")) + playbook = self.validated_data.get("content") - llm = apps.get_app_config("ai").model_mesh_client - start_time = time.time() - explanation = llm.explain_playbook(request, playbook) - duration = round((time.time() - start_time) * 1000, 2) + llm = apps.get_app_config("ai").model_mesh_client + explanation = llm.explain_playbook(request, playbook) - # Anonymize response - # Anonymized in the View to be consistent with where Completions are anonymized - anonymized_explanation = anonymizer.anonymize_struct( - explanation, value_template=Template("{{ _${variable_name}_ }}") - ) + # Anonymize response + # Anonymized in the View to be consistent with where Completions are anonymized + anonymized_explanation = anonymizer.anonymize_struct( + explanation, value_template=Template("{{ _${variable_name}_ }}") + ) - answer = { - "content": anonymized_explanation, - "format": "markdown", - "explanationId": explanation_id, - } - except Exception as exc: - exception = exc - logger.exception(f"An exception {exc.__class__} occurred during a playbook generation") - raise - finally: - self.write_to_segment( - request.user, - explanation_id, - exception, - duration, - playbook_length=len(playbook), - ) + answer = { + "content": anonymized_explanation, + "format": "markdown", + "explanationId": explanation_id, + } return Response( answer, status=rest_framework_status.HTTP_200_OK, ) - def write_to_segment(self, user, explanation_id, exception, duration, playbook_length): - model_name = "" - try: - model_mesh_client = apps.get_app_config("ai").model_mesh_client - model_name = model_mesh_client.get_model_id(user.org_id, "") - except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): - pass - - event = { - "duration": duration, - "exception": exception is not None, - "explanationId": explanation_id, - "modelName": model_name, - "playbook_length": playbook_length, - "rh_user_org_id": user.org_id, - } - if exception: - event["response"] = ( - { - "exception": str(exception), - }, - ) - send_segment_event(event, "explainPlaybook", user) - -class Generation(APIView): +class Generation(OurAPIView): """ Returns a playbook based on a text input. """ @@ -708,6 +692,8 @@ class Generation(APIView): from oauth2_provider.contrib.rest_framework import IsAuthenticatedOrTokenHasScope from rest_framework import permissions + serializer_class = GenerationRequestSerializer + schema1_event_class = CodegenPlaybookSchema1Event permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -733,85 +719,31 @@ class Generation(APIView): summary="Inline code suggestions", ) def post(self, request) -> Response: - exception = None - generation_id = None - wizard_id = None - duration = None - create_outline = None - anonymized_playbook = "" - playbook = "" - request_serializer = GenerationRequestSerializer(data=request.data) - answer = {} - try: - request_serializer.is_valid(raise_exception=True) - generation_id = str(request_serializer.validated_data.get("generationId", "")) - create_outline = request_serializer.validated_data["createOutline"] - outline = str(request_serializer.validated_data.get("outline", "")) - text = request_serializer.validated_data["text"] - wizard_id = str(request_serializer.validated_data.get("wizardId", "")) - - llm = apps.get_app_config("ai").model_mesh_client - start_time = time.time() - playbook, outline = llm.generate_playbook(request, text, create_outline, outline) - duration = round((time.time() - start_time) * 1000, 2) - - # Anonymize responses - # Anonymized in the View to be consistent with where Completions are anonymized - anonymized_playbook = anonymizer.anonymize_struct( - playbook, value_template=Template("{{ _${variable_name}_ }}") - ) - anonymized_outline = anonymizer.anonymize_struct( - outline, value_template=Template("{{ _${variable_name}_ }}") - ) + generation_id = str(self.validated_data.get("generationId", "")) + create_outline = self.validated_data["createOutline"] + outline = str(self.validated_data.get("outline", "")) + text = self.validated_data["text"] + + llm = apps.get_app_config("ai").model_mesh_client + playbook, outline = llm.generate_playbook(request, text, create_outline, outline) + + # Anonymize responses + # Anonymized in the View to be consistent with where Completions are anonymized + anonymized_playbook = anonymizer.anonymize_struct( + playbook, value_template=Template("{{ _${variable_name}_ }}") + ) + anonymized_outline = anonymizer.anonymize_struct( + outline, value_template=Template("{{ _${variable_name}_ }}") + ) - answer = { - "playbook": anonymized_playbook, - "outline": anonymized_outline, - "format": "plaintext", - "generationId": generation_id, - } - except Exception as exc: - exception = exc - logger.exception(f"An exception {exc.__class__} occurred during a playbook generation") - raise - finally: - self.write_to_segment( - request.user, - generation_id, - wizard_id, - exception, - duration, - create_outline, - playbook_length=len(anonymized_playbook), - ) + answer = { + "playbook": anonymized_playbook, + "outline": anonymized_outline, + "format": "plaintext", + "generationId": generation_id, + } return Response( answer, status=rest_framework_status.HTTP_200_OK, ) - - def write_to_segment( - self, user, generation_id, wizard_id, exception, duration, create_outline, playbook_length - ): - model_name = "" - try: - model_mesh_client = apps.get_app_config("ai").model_mesh_client - model_name = model_mesh_client.get_model_id(user.org_id, "") - except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): - pass - event = { - "create_outline": create_outline, - "duration": duration, - "exception": exception is not None, - "generationId": generation_id, - "modelName": model_name, - "playbook_length": playbook_length, - "wizardId": wizard_id, - } - if exception: - event["response"] = ( - { - "exception": str(exception), - }, - ) - send_segment_event(event, "codegenPlaybook", user) diff --git a/ansible_ai_connect/main/middleware.py b/ansible_ai_connect/main/middleware.py index 3a57f7f59..8a788ba03 100644 --- a/ansible_ai_connect/main/middleware.py +++ b/ansible_ai_connect/main/middleware.py @@ -33,7 +33,7 @@ ) from ansible_ai_connect.ai.api.utils.segment import send_segment_event from ansible_ai_connect.ai.api.utils.segment_analytics_telemetry import ( - send_segment_analytics_event, + send_segment_analytics_event, # Schema2 ) from ansible_ai_connect.healthcheck.version_info import VersionInfo @@ -66,6 +66,7 @@ def __init__(self, get_response): def __call__(self, request): start_time = time.time() + # Schema2 if settings.SEGMENT_ANALYTICS_WRITE_KEY: if not segment_analytics_telemetry.write_key: segment_analytics_telemetry.write_key = settings.SEGMENT_ANALYTICS_WRITE_KEY @@ -74,6 +75,7 @@ def __call__(self, request): # segment_analytics_telemetry.send = False # for code development only segment_analytics_telemetry.on_error = on_segment_analytics_error + # Schema1 if settings.SEGMENT_WRITE_KEY: if not analytics.write_key: analytics.write_key = settings.SEGMENT_WRITE_KEY @@ -96,6 +98,7 @@ def __call__(self, request): response = self.get_response(request) + # Schema1 if settings.SEGMENT_WRITE_KEY: if request.path == reverse("completions") and request.method == "POST": request_suggestion_id = getattr( @@ -154,6 +157,7 @@ def __call__(self, request): # Collect analytics telemetry, when tasks exist. if len(tasks) > 0: + # Schema2 send_segment_analytics_event( AnalyticsTelemetryEvents.RECOMMENDATION_GENERATED, lambda: AnalyticsRecommendationGenerated( diff --git a/ansible_wisdom/ai/api/telemetry/test_schema1.py b/ansible_wisdom/ai/api/telemetry/test_schema1.py new file mode 100644 index 000000000..63f77b6be --- /dev/null +++ b/ansible_wisdom/ai/api/telemetry/test_schema1.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python3 +