Skip to content

Commit

Permalink
Finished functionality to add wrkspace system prompt
Browse files Browse the repository at this point in the history
  • Loading branch information
aponcedeleonch committed Jan 20, 2025
1 parent a7cd9de commit b6f255c
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 75 deletions.
Binary file modified codegate_volume/models/all-minilm-L6-v2-q5_k_m.gguf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
Create Date: 2025-01-17 16:33:58.464223
"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'a692c8b52308'
down_revision: Union[str, None] = '5c2f3eee5f90'
revision: str = "a692c8b52308"
down_revision: Union[str, None] = "5c2f3eee5f90"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

Expand Down
9 changes: 6 additions & 3 deletions src/codegate/api/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ async def create_workspace(request: v1_models.CreateWorkspaceRequest):
except AlreadyExistsError:
raise HTTPException(status_code=409, detail="Workspace already exists")
except ValidationError:
raise HTTPException(status_code=400,
detail=("Invalid workspace name. "
"Please use only alphanumeric characters and dashes"))
raise HTTPException(
status_code=400,
detail=(
"Invalid workspace name. " "Please use only alphanumeric characters and dashes"
),
)
except Exception:
raise HTTPException(status_code=500, detail="Internal server error")

Expand Down
7 changes: 5 additions & 2 deletions src/codegate/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
alert_queue = asyncio.Queue()
fim_cache = FimCache()


class AlreadyExistsError(Exception):
pass


class DbCodeGate:
_instance = None

Expand Down Expand Up @@ -266,7 +268,8 @@ async def add_workspace(self, workspace_name: str) -> Optional[Workspace]:

try:
added_workspace = await self._execute_update_pydantic_model(
workspace, sql, should_raise=True)
workspace, sql, should_raise=True
)
except IntegrityError as e:
logger.debug(f"Exception type: {type(e)}")
raise AlreadyExistsError(f"Workspace {workspace_name} already exists.")
Expand Down Expand Up @@ -424,7 +427,7 @@ async def get_active_workspace(self) -> Optional[ActiveWorkspace]:
sql = text(
"""
SELECT
w.id, w.name, s.id as session_id, s.last_update
w.id, w.name, w.system_prompt, s.id as session_id, s.last_update
FROM sessions s
INNER JOIN workspaces w ON w.id = s.active_workspace_id
"""
Expand Down
6 changes: 1 addition & 5 deletions src/codegate/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ class Workspace(BaseModel):
def validate_name(cls, value):
if not re.match(r"^[a-zA-Z0-9_-]+$", value):
raise ValueError("name must be alphanumeric and can only contain _ and -")
# Avoid workspace names that are the same as commands that way we can do stuff like
# `codegate workspace list` and
# `codegate workspace my-ws system-prompt` without any conflicts
elif value in ["list", "add", "activate", "system-prompt"]:
raise ValueError("name cannot be the same as a command")
return value


Expand Down Expand Up @@ -104,5 +99,6 @@ class WorkspaceActive(BaseModel):
class ActiveWorkspace(BaseModel):
id: str
name: str
system_prompt: Optional[str]
session_id: str
last_update: datetime.datetime
72 changes: 45 additions & 27 deletions src/codegate/pipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ async def run(self, args: List[str]) -> str:
@property
def help(self) -> str:
return (
"### CodeGate Version\n\n"
"### CodeGate Version\n"
"Prints the version of CodeGate.\n\n"
"*args*: None\n\n"
"**Usage**: `codegate version`\n\n"
"*args*: None"
)


Expand All @@ -46,6 +46,7 @@ def __init__(self):
"list": self._list_workspaces,
"add": self._add_workspace,
"activate": self._activate_workspace,
"system-prompt": self._add_system_prompt,
}

async def _list_workspaces(self, *args: List[str]) -> str:
Expand All @@ -66,52 +67,63 @@ async def _add_workspace(self, args: List[str]) -> str:
Add a workspace
"""
if args is None or len(args) == 0:
return "Please provide a name. Use `codegate workspace add your_workspace_name`"
return "Please provide a name. Use `codegate workspace add <workspace_name>`"

new_workspace_name = args[0]
if not new_workspace_name:
return "Please provide a name. Use `codegate workspace add your_workspace_name`"
return "Please provide a name. Use `codegate workspace add <workspace_name>`"

try:
_ = await self.workspace_crud.add_workspace(new_workspace_name)
except ValidationError:
return "Invalid workspace name: It should be alphanumeric and dashes"
except AlreadyExistsError:
return f"Workspace **{new_workspace_name}** already exists"
return f"Workspace `{new_workspace_name}` already exists"
except Exception:
return "An error occurred while adding the workspace"

return f"Workspace **{new_workspace_name}** has been added"
return f"Workspace `{new_workspace_name}` has been added"

async def _activate_workspace(self, args: List[str]) -> str:
"""
Activate a workspace
"""
if args is None or len(args) == 0:
return "Please provide a name. Use `codegate workspace activate workspace_name`"
return "Please provide a name. Use `codegate workspace activate <workspace_name>`"

workspace_name = args[0]
if not workspace_name:
return "Please provide a name. Use `codegate workspace activate workspace_name`"
return "Please provide a name. Use `codegate workspace activate <workspace_name>`"

was_activated = await self.workspace_crud.activate_workspace(workspace_name)
if not was_activated:
return (
f"Workspace **{workspace_name}** does not exist or was already active. "
f"Workspace `{workspace_name}` does not exist or was already active. "
f"Use `codegate workspace add {workspace_name}` to add it"
)
return f"Workspace **{workspace_name}** has been activated"
return f"Workspace `{workspace_name}` has been activated"

async def _add_system_prompt(self, workspace_name: str, sys_prompt_lst: List[str]):
updated_worksapce = await self.workspace_crud.update_workspace_system_prompt(workspace_name, sys_prompt_lst)
async def _add_system_prompt(self, args: List[str]):
if len(args) < 2:
return (
"Please provide a workspace name and a system prompt. "
"Use `codegate workspace system-prompt <workspace_name> <system_prompt>`"
)

workspace_name = args[0]
sys_prompt_lst = args[1:]

updated_worksapce = await self.workspace_crud.update_workspace_system_prompt(
workspace_name, sys_prompt_lst
)
if not updated_worksapce:
return (
f"Workspace system prompt not updated. "
f"Check if the workspace **{workspace_name}** exists"
f"Check if the workspace `{workspace_name}` exists"
)
return (
f"Workspace **{updated_worksapce.name}** system prompt "
f"updated to:\n\n```{updated_worksapce.system_prompt}```"
f"Workspace `{updated_worksapce.name}` system prompt "
f"updated to:\n```\n{updated_worksapce.system_prompt}\n```"
)

async def run(self, args: List[str]) -> str:
Expand All @@ -122,23 +134,29 @@ async def run(self, args: List[str]) -> str:
if command_to_execute is not None:
return await command_to_execute(args[1:])
else:
if len(args) >= 2 and args[1] == "system-prompt":
return await self._add_system_prompt(args[0], args[2:])
return "Command not found. Use `codegate workspace -h` to see available commands"

@property
def help(self) -> str:
return (
"### CodeGate Workspace\n\n"
"### CodeGate Workspace\n"
"Manage workspaces.\n\n"
"**Usage**: `codegate workspace <command> [args]`\n\n"
"Available commands:\n\n"
"- `list`: List all workspaces\n\n"
" - *args*: None\n\n"
"- `add`: Add a workspace\n\n"
" - *args*:\n\n"
" - `workspace_name`\n\n"
"- `activate`: Activate a workspace\n\n"
" - *args*:\n\n"
" - `workspace_name`"
"Available commands:\n"
"- `list`: List all workspaces\n"
" - *args*: None\n"
" - **Usage**: `codegate workspace list`\n"
"- `add`: Add a workspace\n"
" - *args*:\n"
" - `workspace_name`\n"
" - **Usage**: `codegate workspace add <workspace_name>`\n"
"- `activate`: Activate a workspace\n"
" - *args*:\n"
" - `workspace_name`\n"
" - **Usage**: `codegate workspace activate <workspace_name>`\n"
"- `system-prompt`: Modify the system-prompt of a workspace\n"
" - *args*:\n"
" - `workspace_name`\n"
" - `system_prompt`\n"
" - **Usage**: `codegate workspace system-prompt <workspace_name> <system_prompt>`\n"
)
78 changes: 60 additions & 18 deletions src/codegate/pipeline/system_prompt/codegate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json
from typing import Optional

from litellm import ChatCompletionRequest, ChatCompletionSystemMessage

Expand All @@ -7,6 +7,7 @@
PipelineResult,
PipelineStep,
)
from codegate.workspaces.crud import WorkspaceCrud


class SystemPrompt(PipelineStep):
Expand All @@ -16,7 +17,7 @@ class SystemPrompt(PipelineStep):
"""

def __init__(self, system_prompt: str):
self._system_message = ChatCompletionSystemMessage(content=system_prompt, role="system")
self.codegate_system_prompt = system_prompt

@property
def name(self) -> str:
Expand All @@ -25,6 +26,44 @@ def name(self) -> str:
"""
return "system-prompt"

async def _get_workspace_system_prompt(self) -> str:
wksp_crud = WorkspaceCrud()
workspace = await wksp_crud.get_active_workspace()
if not workspace:
return ""

return workspace.system_prompt

async def _construct_system_prompt(
self,
wrksp_sys_prompt: str,
req_sys_prompt: Optional[str],
should_add_codegate_sys_prompt: bool,
) -> ChatCompletionSystemMessage:

def _start_or_append(existing_prompt: str, new_prompt: str) -> str:
if existing_prompt:
return existing_prompt + "\n\nHere are additional instructions:\n\n" + new_prompt
return new_prompt

system_prompt = ""
# Add codegate system prompt if secrets or bad packages are found at the beginning
if should_add_codegate_sys_prompt:
system_prompt = _start_or_append(system_prompt, self.codegate_system_prompt)

# Add workspace system prompt if present
if wrksp_sys_prompt:
system_prompt = _start_or_append(system_prompt, wrksp_sys_prompt)

# Add request system prompt if present
if req_sys_prompt and "codegate" not in req_sys_prompt.lower():
system_prompt = _start_or_append(system_prompt, req_sys_prompt)

return system_prompt

async def _should_add_codegate_system_prompt(self, context: PipelineContext) -> bool:
return context.secrets_found or context.bad_packages_found

async def process(
self, request: ChatCompletionRequest, context: PipelineContext
) -> PipelineResult:
Expand All @@ -33,32 +72,35 @@ async def process(
to the existing system prompt
"""

# Nothing to do if no secrets or bad_packages are found
if not (context.secrets_found or context.bad_packages_found):
wrksp_sys_prompt = await self._get_workspace_system_prompt()
should_add_codegate_sys_prompt = await self._should_add_codegate_system_prompt(context)

# Nothing to do if no secrets or bad_packages are found and we don't have a workspace
# system prompt
if not should_add_codegate_sys_prompt and not wrksp_sys_prompt:
return PipelineResult(request=request, context=context)

new_request = request.copy()

if "messages" not in new_request:
new_request["messages"] = []

request_system_message = None
request_system_message = {}
for message in new_request["messages"]:
if message["role"] == "system":
request_system_message = message
req_sys_prompt = request_system_message.get("content")

if request_system_message is None:
# Add system message
context.add_alert(self.name, trigger_string=json.dumps(self._system_message))
new_request["messages"].insert(0, self._system_message)
elif "codegate" not in request_system_message["content"].lower():
# Prepend to the system message
prepended_message = (
self._system_message["content"]
+ "\n Here are additional instructions. \n "
+ request_system_message["content"]
)
context.add_alert(self.name, trigger_string=prepended_message)
request_system_message["content"] = prepended_message
system_prompt = await self._construct_system_prompt(
wrksp_sys_prompt, req_sys_prompt, should_add_codegate_sys_prompt
)
context.add_alert(self.name, trigger_string=system_prompt)
if not request_system_message:
# Insert the system prompt at the beginning of the messages
sytem_message = ChatCompletionSystemMessage(content=system_prompt, role="system")
new_request["messages"].insert(0, sytem_message)
else:
# Update the existing system prompt
request_system_message["content"] = system_prompt

return PipelineResult(request=new_request, context=context)
7 changes: 4 additions & 3 deletions src/codegate/workspaces/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
class WorkspaceCrudError(Exception):
pass


class WorkspaceCrud:

def __init__(self):
Expand All @@ -24,7 +25,7 @@ async def add_workspace(self, new_workspace_name: str) -> Workspace:
workspace_created = await db_recorder.add_workspace(new_workspace_name)
return workspace_created

async def get_workspaces(self)-> List[WorkspaceActive]:
async def get_workspaces(self) -> List[WorkspaceActive]:
"""
Get all workspaces
"""
Expand Down Expand Up @@ -79,8 +80,8 @@ async def activate_workspace(self, workspace_name: str) -> bool:
return True

async def update_workspace_system_prompt(
self, workspace_name: str, sys_prompt_lst: List[str]
) -> Optional[Workspace]:
self, workspace_name: str, sys_prompt_lst: List[str]
) -> Optional[Workspace]:
selected_workspace = await self._db_reader.get_workspace_by_name(workspace_name)
if not selected_workspace:
return None
Expand Down
Loading

0 comments on commit b6f255c

Please sign in to comment.