diff --git a/ansible_wisdom/ai/api/telemetry/schema1.py b/ansible_wisdom/ai/api/telemetry/schema1.py new file mode 100644 index 000000000..6691d88ca --- /dev/null +++ b/ansible_wisdom/ai/api/telemetry/schema1.py @@ -0,0 +1,93 @@ +# Copyright Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum + +from attr import Factory, field, frozen +from attrs import define, validators, fields +from django.utils import timezone + +from attr import asdict + +import platform +from ansible_ai_connect.healthcheck.version_info import VersionInfo +from ansible_ai_connect.users.models import User +version_info = VersionInfo() + +@define +class ResponsePayload: + exception: str = field(validator=validators.instance_of(str), converter=str, default="") + +@define +class Schema1Event: + event_name: str = "noName" + imageTags: str = field(validator=validators.instance_of(str), converter=str, default=version_info.image_tags) + hostname: str = field(validator=validators.instance_of(str), converter=str, default=platform.node()) + groups: list[str] = Factory(list) + + rh_user_has_seat: bool = False + rh_user_org_id: int|None = None + timestamp = timezone.now().isoformat() + modelName: str = field(validator=validators.instance_of(str), converter=str, default="") + exception: bool = False + response: ResponsePayload|None = ResponsePayload() + user: User|None = None + duration: int = field(validator=validators.instance_of(int), converter=int, default=0) + + def set_user(self, user): + self.user = user + self.rh_user_has_seat = user.rh_user_has_seat + self.rh_user_org_id = user.org_id + self.groups = list(user.groups.values_list("name", flat=True)) + + def set_exception(self, exception): + if exception: + self.exception = True + self.response.exception=str(exception) + + def set_validated_data(self, validated_data): + for field, value in validated_data.items(): + if hasattr(self, field): + setattr(self, field, value) + + + def get_event_dict(self): + # NOTE: The allowed fields should be moved in the event class itslef + def my_filter(a, v): + return not a.name in ["event_name", "user"] + return asdict(self, filter=my_filter) + +@define +class ExplainPlaybookSchema1Event(Schema1Event): + event_name: str = "explainPlaybook" + explanationId: str = field(validator=validators.instance_of(str), converter=str, default="") + +@define +class CodegenPlaybookSchema1Event(Schema1Event): + event_name: str = "codegenPlaybook" + generationId: str = field(validator=validators.instance_of(str), converter=str, default="") + wizardId: str = field(validator=validators.instance_of(str), converter=str, default="") + playbook_length: int|None = None + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.playbook_length = len(validated_data.get("playbook", "")) + +@define +class ContentMatchSchema1Event(Schema1Event): + event_name: str = "codematch" + request_data: dict|None = None + metadata: list|None = None # TODO + def set_validated_data(self, validated_data): + super().set_validated_data(validated_data) + self.request_data = validated_data diff --git a/ansible_wisdom/ai/api/utils/analytics_telemetry_model.py b/ansible_wisdom/ai/api/utils/analytics_telemetry_model.py index f863329b2..da43e6c4d 100644 --- a/ansible_wisdom/ai/api/utils/analytics_telemetry_model.py +++ b/ansible_wisdom/ai/api/utils/analytics_telemetry_model.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Schema2 + from enum import Enum from attr import Factory, field, frozen diff --git a/ansible_wisdom/ai/api/utils/segment.py b/ansible_wisdom/ai/api/utils/segment.py index 6cda9161d..d9b00ff55 100644 --- a/ansible_wisdom/ai/api/utils/segment.py +++ b/ansible_wisdom/ai/api/utils/segment.py @@ -123,6 +123,16 @@ def base_send_segment_event( send_segment_event(event, "segmentError", user) +def send_schema1_event(event_obj) -> None: + print(event_obj) + if not settings.SEGMENT_WRITE_KEY: + logger.info("segment write key not set, skipping event") + return + base_send_segment_event( + event_obj.get_event_dict(), event_obj.event_name, event_obj.user, analytics + ) + + def redact_seated_users_data(event: Dict[str, Any], allow_list: Dict[str, Any]) -> Dict[str, Any]: """ Copy a dictionary to another dictionary using a nested list of allowed keys. diff --git a/ansible_wisdom/ai/api/views.py b/ansible_wisdom/ai/api/views.py index c74894b5c..2c56f2416 100644 --- a/ansible_wisdom/ai/api/views.py +++ b/ansible_wisdom/ai/api/views.py @@ -62,6 +62,11 @@ WcaUserTrialExpired, ) from ansible_ai_connect.ai.api.pipelines.completions import CompletionsPipeline +from ansible_ai_connect.ai.api.telemetry.schema1 import ( + CodegenPlaybookSchema1Event, + ContentMatchSchema1Event, + ExplainPlaybookSchema1Event, +) from ansible_ai_connect.users.models import User from .. import search as ai_search @@ -104,7 +109,7 @@ AnalyticsRecommendationAction, AnalyticsTelemetryEvents, ) -from .utils.segment import send_segment_event +from .utils.segment import send_schema1_event, send_segment_event from .utils.segment_analytics_telemetry import send_segment_analytics_event logger = logging.getLogger(__name__) @@ -154,6 +159,55 @@ } +class OurAPIView(APIView): + exception = None + + def initial(self, request, *args, **kwargs): + super().initial(request, *args, **kwargs) + request_serializer = self.serializer_class(data=request.data) + request_serializer.is_valid(raise_exception=True) + self.validated_data = request_serializer.validated_data + + def _get_model_name(self, org_id: str) -> str: + try: + model_mesh_client = apps.get_app_config("ai").model_mesh_client + return model_mesh_client.get_model_id(org_id, self.validated_data.get("model", "")) + except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): + return "" + + def handle_exception(self, exc): + self.exception = exc + super().handle_exception(exc) + + def dispatch(self, request, *args, **kwargs): + print(f"BEFORE: request={request}") + start_time = time.time() + self.exception = False + response = None + try: + response = super().dispatch(request, *args, **kwargs) + except Exception as exc: + logger.exception(f"An exception {exc.__class__} occurred during a {request.path} call") + raise + finally: + duration = round((time.time() - start_time) * 1000, 2) + print(f"AFTER: request={response}") + + if self.schema1_event_class: + my_schema1_event = self.schema1_event_class() + my_schema1_event.duration = duration + my_schema1_event.modelName = self._get_model_name(request.user.org_id) + my_schema1_event.set_user(request.user) + my_schema1_event.set_exception(self.exception) + my_schema1_event.set_validated_data(self.validated_data) + print(f"Event dict: {my_schema1_event.get_event_dict()}") + send_schema1_event(my_schema1_event) + + print("Yoohoo") + + return response + + class Completions(APIView): """ Returns inline code suggestions based on a given Ansible editor context. @@ -466,12 +520,13 @@ def write_to_segment( send_segment_event(event, "attribution", user) -class ContentMatches(GenericAPIView): +class ContentMatches(OurAPIView): """ Returns content matches that were the highest likelihood sources for a given code suggestion. """ serializer_class = ContentMatchRequestSerializer + schema1_event_class = ContentMatchSchema1Event permission_classes = ( [ @@ -504,36 +559,22 @@ class ContentMatches(GenericAPIView): summary="Code suggestion attributions", ) def post(self, request) -> Response: - request_serializer = self.get_serializer(data=request.data) - request_serializer.is_valid(raise_exception=True) + suggestion_id = str(self.validated_data.get("suggestionId", "")) + model_id = str(self.validated_data.get("model", "")) - request_data = request_serializer.validated_data - suggestion_id = str(request_data.get("suggestionId", "")) - model_id = str(request_data.get("model", "")) - - try: - if request.user.rh_user_has_seat: - response_serializer = self.perform_content_matching( - model_id, suggestion_id, request.user, request_data - ) - else: - response_serializer = self.perform_search(request_data, request.user) - return Response(response_serializer.data, status=rest_framework_status.HTTP_200_OK) - except Exception: - logger.exception("Error requesting content matches") - raise + response_serializer = self.perform_content_matching(model_id, suggestion_id, request.user) + return Response(response_serializer.data, status=rest_framework_status.HTTP_200_OK) def perform_content_matching( self, model_id: str, suggestion_id: str, user: User, - request_data, ): model_mesh_client = apps.get_app_config("ai").model_mesh_client user_id = user.uuid content_match_data: ContentMatchPayloadData = { - "suggestions": request_data.get("suggestions", []), + "suggestions": self.validated_data.get("suggestions", []), "user_id": str(user_id) if user_id else None, "rh_user_has_seat": user.rh_user_has_seat, "organization_id": user.org_id, @@ -543,219 +584,39 @@ def perform_content_matching( f"input to content matches for suggestion id {suggestion_id}:\n{content_match_data}" ) - exception = None - event = None - event_name = None - start_time = time.time() response_serializer = None metadata = [] - try: - model_id, client_response = model_mesh_client.codematch(content_match_data, model_id) - - response_data = {"contentmatches": []} - - for response_item in client_response: - content_match_dto = ContentMatchResponseDto(**response_item) - response_data["contentmatches"].append(content_match_dto.content_matches) - metadata.append(content_match_dto.meta) - - contentmatch_encoding_hist.observe(content_match_dto.encode_duration / 1000) - contentmatch_search_hist.observe(content_match_dto.search_duration / 1000) - - try: - response_serializer = ContentMatchResponseSerializer(data=response_data) - response_serializer.is_valid(raise_exception=True) - except Exception: - process_error_count.labels( - stage="contentmatch-response_serialization_validation" - ).inc() - logger.exception(f"error serializing final response for suggestion {suggestion_id}") - raise InternalServerError - - except ModelTimeoutError as e: - exception = e - logger.warn( - f"model timed out after {settings.ANSIBLE_AI_MODEL_MESH_API_TIMEOUT} seconds" - f" for suggestion {suggestion_id}" - ) - raise ModelTimeoutException(cause=e) - - except WcaBadRequest as e: - exception = e - logger.exception(f"bad request for content matching suggestion {suggestion_id}") - raise WcaBadRequestException(cause=e) - - except WcaInvalidModelId as e: - exception = e - logger.exception( - f"WCA Model ID is invalid for content matching suggestion {suggestion_id}" - ) - raise WcaInvalidModelIdException(cause=e) - - except WcaKeyNotFound as e: - exception = e - logger.exception( - f"A WCA Api Key was expected but not found for " - f"content matching suggestion {suggestion_id}" - ) - raise WcaKeyNotFoundException(cause=e) - - except WcaModelIdNotFound as e: - exception = e - logger.exception( - f"A WCA Model ID was expected but not found for " - f"content matching suggestion {suggestion_id}" - ) - raise WcaModelIdNotFoundException(cause=e) - - except WcaNoDefaultModelId as e: - exception = e - logger.exception( - "A default WCA Model ID was expected but not found for " - f"content matching suggestion {suggestion_id}" - ) - raise WcaNoDefaultModelIdException(cause=e) - - except WcaSuggestionIdCorrelationFailure as e: - exception = e - logger.exception( - f"WCA Request/Response SuggestionId correlation failed " - f"for suggestion {suggestion_id}" - ) - raise WcaSuggestionIdCorrelationFailureException(cause=e) - - except WcaEmptyResponse as e: - exception = e - logger.exception( - f"WCA returned an empty response for content matching suggestion {suggestion_id}" - ) - raise WcaEmptyResponseException(cause=e) - - except WcaCloudflareRejection as e: - exception = e - logger.exception(f"Cloudflare rejected the request for {suggestion_id}") - raise WcaCloudflareRejectionException(cause=e) - - except WcaUserTrialExpired as e: - exception = e - logger.exception(f"User trial expired, when requesting suggestion {suggestion_id}") - event_name = "trialExpired" - event = { - "type": "contentmatch", - "modelName": model_id, - "suggestionId": str(suggestion_id), - } - raise WcaUserTrialExpiredException(cause=e) - - except Exception as e: - exception = e - logger.exception(f"Error requesting content matches for suggestion {suggestion_id}") - raise ServiceUnavailable(cause=e) + model_id, client_response = model_mesh_client.codematch(content_match_data, model_id) - finally: - duration = round((time.time() - start_time) * 1000, 2) - if exception: - model_id_in_exception = BaseWisdomAPIException.get_model_id_from_exception( - exception - ) - if model_id_in_exception: - model_id = model_id_in_exception - if event: - event["modelName"] = model_id - send_segment_event(event, event_name, user) - else: - self.write_to_segment( - request_data, - duration, - exception, - metadata, - model_id, - response_serializer.data if response_serializer else {}, - suggestion_id, - user, - ) + response_data = {"contentmatches": []} - return response_serializer + for response_item in client_response: + content_match_dto = ContentMatchResponseDto(**response_item) + response_data["contentmatches"].append(content_match_dto.content_matches) + metadata.append(content_match_dto.meta) - def perform_search(self, request_data, user: User): - suggestion_id = str(request_data.get("suggestionId", "")) - response_serializer = None - - exception = None - start_time = time.time() - metadata = [] - model_name = "" + contentmatch_encoding_hist.observe(content_match_dto.encode_duration / 1000) + contentmatch_search_hist.observe(content_match_dto.search_duration / 1000) try: - suggestion = request_data["suggestions"][0] - response_item = ai_search.search(suggestion) - - attributions_dto = AttributionsResponseDto(**response_item) - response_data = {"contentmatches": []} - response_data["contentmatches"].append(attributions_dto.content_matches) - metadata.append(attributions_dto.meta) - - try: - response_serializer = ContentMatchResponseSerializer(data=response_data) - response_serializer.is_valid(raise_exception=True) - except Exception: - process_error_count.labels(stage="attr-response_serialization_validation").inc() - logger.exception(f"Error serializing final response for suggestion {suggestion_id}") - raise InternalServerError - - except Exception as e: - exception = e - logger.exception("Failed to search for attributions for content matching") - return Response( - {"message": "Unable to complete the request"}, status=HTTPStatus.SERVICE_UNAVAILABLE - ) - finally: - duration = round((time.time() - start_time) * 1000, 2) - self.write_to_segment( - request_data, - duration, - exception, - metadata, - model_name, - response_serializer.data if response_serializer else {}, - suggestion_id, - user, - ) + response_serializer = ContentMatchResponseSerializer(data=response_data) + response_serializer.is_valid(raise_exception=True) + except Exception: + process_error_count.labels(stage="contentmatch-response_serialization_validation").inc() + logger.exception(f"error serializing final response for suggestion {suggestion_id}") + raise InternalServerError return response_serializer - def write_to_segment( - self, - request_data, - duration, - exception, - metadata, - model_id, - response_data, - suggestion_id, - user, - ): - event = { - "duration": duration, - "exception": exception is not None, - "modelName": model_id, - "problem": None if exception is None else exception.__class__.__name__, - "request": request_data, - "response": response_data, - "suggestionId": str(suggestion_id), - "rh_user_has_seat": user.rh_user_has_seat, - "rh_user_org_id": user.org_id, - "metadata": metadata, - } - send_segment_event(event, "contentmatch", user) - -class Explanation(APIView): +class Explanation(OurAPIView): """ Returns a text that explains a playbook. """ + serializer_class = ExplanationRequestSerializer + schema1_event_class = ExplainPlaybookSchema1Event permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -781,77 +642,31 @@ class Explanation(APIView): summary="Inline code suggestions", ) def post(self, request) -> Response: - duration = None - exception = None - explanation_id = None - playbook = "" - answer = {} - request_serializer = ExplanationRequestSerializer(data=request.data) - try: - request_serializer.is_valid(raise_exception=True) - explanation_id = str(request_serializer.validated_data.get("explanationId", "")) - playbook = request_serializer.validated_data.get("content") + explanation_id = str(self.validated_data.get("explanationId", "")) + playbook = self.validated_data.get("content") - llm = apps.get_app_config("ai").model_mesh_client - start_time = time.time() - explanation = llm.explain_playbook(request, playbook) - duration = round((time.time() - start_time) * 1000, 2) + llm = apps.get_app_config("ai").model_mesh_client + explanation = llm.explain_playbook(request, playbook) - # Anonymize response - # Anonymized in the View to be consistent with where Completions are anonymized - anonymized_explanation = anonymizer.anonymize_struct( - explanation, value_template=Template("{{ _${variable_name}_ }}") - ) + # Anonymize response + # Anonymized in the View to be consistent with where Completions are anonymized + anonymized_explanation = anonymizer.anonymize_struct( + explanation, value_template=Template("{{ _${variable_name}_ }}") + ) - answer = { - "content": anonymized_explanation, - "format": "markdown", - "explanationId": explanation_id, - } - except Exception as exc: - exception = exc - logger.exception(f"An exception {exc.__class__} occurred during a playbook generation") - raise - finally: - self.write_to_segment( - request.user, - explanation_id, - exception, - duration, - playbook_length=len(playbook), - ) + answer = { + "content": anonymized_explanation, + "format": "markdown", + "explanationId": explanation_id, + } return Response( answer, status=rest_framework_status.HTTP_200_OK, ) - def write_to_segment(self, user, explanation_id, exception, duration, playbook_length): - model_name = "" - try: - model_mesh_client = apps.get_app_config("ai").model_mesh_client - model_name = model_mesh_client.get_model_id(user.org_id, "") - except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): - pass - event = { - "duration": duration, - "exception": exception is not None, - "explanationId": explanation_id, - "modelName": model_name, - "playbook_length": playbook_length, - "rh_user_org_id": user.org_id, - } - if exception: - event["response"] = ( - { - "exception": str(exception), - }, - ) - send_segment_event(event, "explainPlaybook", user) - - -class Generation(APIView): +class Generation(OurAPIView): """ Returns a playbook based on a text input. """ @@ -859,6 +674,8 @@ class Generation(APIView): from oauth2_provider.contrib.rest_framework import IsAuthenticatedOrTokenHasScope from rest_framework import permissions + serializer_class = GenerationRequestSerializer + schema1_event_class = CodegenPlaybookSchema1Event permission_classes = [ permissions.IsAuthenticated, IsAuthenticatedOrTokenHasScope, @@ -884,85 +701,31 @@ class Generation(APIView): summary="Inline code suggestions", ) def post(self, request) -> Response: - exception = None - generation_id = None - wizard_id = None - duration = None - create_outline = None - anonymized_playbook = "" - playbook = "" - request_serializer = GenerationRequestSerializer(data=request.data) - answer = {} - try: - request_serializer.is_valid(raise_exception=True) - generation_id = str(request_serializer.validated_data.get("generationId", "")) - create_outline = request_serializer.validated_data["createOutline"] - outline = str(request_serializer.validated_data.get("outline", "")) - text = request_serializer.validated_data["text"] - wizard_id = str(request_serializer.validated_data.get("wizardId", "")) - - llm = apps.get_app_config("ai").model_mesh_client - start_time = time.time() - playbook, outline = llm.generate_playbook(request, text, create_outline, outline) - duration = round((time.time() - start_time) * 1000, 2) - - # Anonymize responses - # Anonymized in the View to be consistent with where Completions are anonymized - anonymized_playbook = anonymizer.anonymize_struct( - playbook, value_template=Template("{{ _${variable_name}_ }}") - ) - anonymized_outline = anonymizer.anonymize_struct( - outline, value_template=Template("{{ _${variable_name}_ }}") - ) + generation_id = str(self.validated_data.get("generationId", "")) + create_outline = self.validated_data["createOutline"] + outline = str(self.validated_data.get("outline", "")) + text = self.validated_data["text"] + + llm = apps.get_app_config("ai").model_mesh_client + playbook, outline = llm.generate_playbook(request, text, create_outline, outline) + + # Anonymize responses + # Anonymized in the View to be consistent with where Completions are anonymized + anonymized_playbook = anonymizer.anonymize_struct( + playbook, value_template=Template("{{ _${variable_name}_ }}") + ) + anonymized_outline = anonymizer.anonymize_struct( + outline, value_template=Template("{{ _${variable_name}_ }}") + ) - answer = { - "playbook": anonymized_playbook, - "outline": anonymized_outline, - "format": "plaintext", - "generationId": generation_id, - } - except Exception as exc: - exception = exc - logger.exception(f"An exception {exc.__class__} occurred during a playbook generation") - raise - finally: - self.write_to_segment( - request.user, - generation_id, - wizard_id, - exception, - duration, - create_outline, - playbook_length=len(anonymized_playbook), - ) + answer = { + "playbook": anonymized_playbook, + "outline": anonymized_outline, + "format": "plaintext", + "generationId": generation_id, + } return Response( answer, status=rest_framework_status.HTTP_200_OK, ) - - def write_to_segment( - self, user, generation_id, wizard_id, exception, duration, create_outline, playbook_length - ): - model_name = "" - try: - model_mesh_client = apps.get_app_config("ai").model_mesh_client - model_name = model_mesh_client.get_model_id(user.org_id, "") - except (WcaNoDefaultModelId, WcaModelIdNotFound, WcaSecretManagerError): - pass - event = { - "create_outline": create_outline, - "duration": duration, - "exception": exception is not None, - "generationId": generation_id, - "modelName": model_name, - "playbook_length": playbook_length, - "wizardId": wizard_id, - } - if exception: - event["response"] = ( - { - "exception": str(exception), - }, - ) - send_segment_event(event, "codegenPlaybook", user) diff --git a/ansible_wisdom/main/middleware.py b/ansible_wisdom/main/middleware.py index 3a57f7f59..8a788ba03 100644 --- a/ansible_wisdom/main/middleware.py +++ b/ansible_wisdom/main/middleware.py @@ -33,7 +33,7 @@ ) from ansible_ai_connect.ai.api.utils.segment import send_segment_event from ansible_ai_connect.ai.api.utils.segment_analytics_telemetry import ( - send_segment_analytics_event, + send_segment_analytics_event, # Schema2 ) from ansible_ai_connect.healthcheck.version_info import VersionInfo @@ -66,6 +66,7 @@ def __init__(self, get_response): def __call__(self, request): start_time = time.time() + # Schema2 if settings.SEGMENT_ANALYTICS_WRITE_KEY: if not segment_analytics_telemetry.write_key: segment_analytics_telemetry.write_key = settings.SEGMENT_ANALYTICS_WRITE_KEY @@ -74,6 +75,7 @@ def __call__(self, request): # segment_analytics_telemetry.send = False # for code development only segment_analytics_telemetry.on_error = on_segment_analytics_error + # Schema1 if settings.SEGMENT_WRITE_KEY: if not analytics.write_key: analytics.write_key = settings.SEGMENT_WRITE_KEY @@ -96,6 +98,7 @@ def __call__(self, request): response = self.get_response(request) + # Schema1 if settings.SEGMENT_WRITE_KEY: if request.path == reverse("completions") and request.method == "POST": request_suggestion_id = getattr( @@ -154,6 +157,7 @@ def __call__(self, request): # Collect analytics telemetry, when tasks exist. if len(tasks) > 0: + # Schema2 send_segment_analytics_event( AnalyticsTelemetryEvents.RECOMMENDATION_GENERATED, lambda: AnalyticsRecommendationGenerated(