Skip to content

Commit

Permalink
Support queuing messages in chat
Browse files Browse the repository at this point in the history
  • Loading branch information
michalwarda committed Feb 24, 2024
1 parent 20f1c88 commit aa6e267
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
24 changes: 21 additions & 3 deletions apps/api/lib/buildel/blocks/chat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ defmodule Buildel.Blocks.Chat do
GenServer.call(pid, {:send_message, message}, 5 * 60_000)
end

defp save_input_and_send_message(pid, {topic, :text, message}) do
%{block: block, io: output} = Buildel.BlockPubSub.io_from_topic(topic)

GenServer.cast(
pid,
{:save_input, %{block_name: block, message: {:text, message}, output_name: output}}
)

GenServer.cast(pid, {:send_message, {:text, message}})
end

defp save_text_chunk(pid, chunk) do
GenServer.cast(pid, {:save_text_chunk, chunk})
end
Expand Down Expand Up @@ -225,6 +236,12 @@ defmodule Buildel.Blocks.Chat do
state
|> assign_stream_state
|> assign_take_latest()
|> Map.put(
:input_queue,
Buildel.Blocks.Utils.InputQueue.new([], fn input ->
save_input_and_send_message(self(), input)
end)
)
|> Map.put(:prompt_template, opts.prompt_template)
|> Map.put(:api_key, api_key)
|> Map.put(:tool_connections, tool_connections)
Expand Down Expand Up @@ -306,11 +323,13 @@ defmodule Buildel.Blocks.Chat do
@impl true
def handle_cast({:finish_chat_message}, %{chat_memory: %ChatMemory{type: :off}} = state) do
state = update_in(state.chat_memory, &ChatMemory.reset(&1))
state = update_in(state.input_queue, &Buildel.Blocks.Utils.InputQueue.pop(&1))
{:noreply, state |> send_stream_stop()}
end

@impl true
def handle_cast({:finish_chat_message}, state) do
state = update_in(state.input_queue, &Buildel.Blocks.Utils.InputQueue.pop(&1))
{:noreply, state |> send_stream_stop()}
end

Expand Down Expand Up @@ -461,9 +480,8 @@ defmodule Buildel.Blocks.Chat do
end

@impl true
def handle_info({name, :text, message}, state) do
state = save_latest_input_value(state, name, message)
send_message(self(), {:text, message})
def handle_info({_name, :text, _message} = info, state) do
state = update_in(state.input_queue, &Buildel.Blocks.Utils.InputQueue.push(&1, info))
{:noreply, state}
end

Expand Down
54 changes: 54 additions & 0 deletions apps/api/lib/buildel/blocks/utils/input_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Buildel.Blocks.Utils.InputQueue do
defstruct [:queue, :process_item]
alias __MODULE__

@type item :: any()
@type queue :: [item()]
@type process_item :: (item() -> any())
@type t :: %InputQueue{queue: queue(), process_item: process_item()}

@spec new(queue(), process_item()) :: t()
def new(items, process_item) do
%__MODULE__{queue: items, process_item: process_item}
end

@spec push(t(), item()) :: t()
def push(%__MODULE__{queue: [], process_item: process_item}, item) do
IO.puts("pushing first item")
process_item.(item)

%__MODULE__{queue: [item], process_item: process_item}
end

def push(%__MODULE__{queue: queue, process_item: process_item}, item) do
IO.puts("pushing another item")

%__MODULE__{queue: queue ++ [item], process_item: process_item}
end

@spec pop(t()) :: t()
def pop(%__MODULE__{queue: [], process_item: process_item}) do
IO.puts("removing from empty queue")
%__MODULE__{queue: [], process_item: process_item}
end

@spec pop(t()) :: t()
def pop(%__MODULE__{queue: [_item], process_item: process_item}) do
IO.puts("removing last item")
%__MODULE__{queue: [], process_item: process_item}
end

def pop(%__MODULE__{queue: [_item | [item]], process_item: process_item}) do
IO.puts("removing another item")
process_item.(item)

%__MODULE__{queue: [item], process_item: process_item}
end

def pop(%__MODULE__{queue: [_item | [item | rest]], process_item: process_item}) do
IO.puts("removing another item from queue")
process_item.(item)

%__MODULE__{queue: rest, process_item: process_item}
end
end

0 comments on commit aa6e267

Please sign in to comment.