Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime #524

Open
alexrudall opened this issue Oct 10, 2024 · 13 comments · May be fixed by #545
Open

Realtime #524

alexrudall opened this issue Oct 10, 2024 · 13 comments · May be fixed by #545

Comments

@alexrudall
Copy link
Owner

alexrudall commented Oct 10, 2024

OpenAI added a realtime API!

How do we add this to ruby-openai?

  • As simple and easy to use as possible
  • Adding as few dependencies as possible
  • As reliable as possible

Options

@danielfriis
Copy link

danielfriis commented Oct 16, 2024

Did a small poc a few days ago, connecting Twilio and the realtime API:

class AnswerCallController < ApplicationController
  skip_before_action :verify_authenticity_token
  require "faye/websocket"
  require "net/http"
  require "eventmachine"

  def incoming_call
    response = Twilio::TwiML::VoiceResponse.new do |r|
      r.say(message: "Connecting to the AI voice assistant...")
      r.connect do |c|
        c.stream(url: "wss://#{request.host_with_port}/media-stream")
      end
    end
    render xml: response.to_s
  end

  def media_stream
    if Faye::WebSocket.websocket?(request.env)
      ws = Faye::WebSocket.new(request.env)
      stream_sid = nil

      ws.on :open do |event|
        puts "Twilio client connected"
        # Connect to OpenAI WebSocket
        openai_ws = Faye::WebSocket::Client.new("wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01", nil, headers: {
          "Authorization" => "Bearer #{Rails.application.credentials.dig(:openai, :api_key)}",
          "OpenAI-Beta" => "realtime=v1"
        })

        openai_ws.on :open do |event|
          puts "Connected to OpenAI Realtime API"
          # Send session update
          session_update = {
            type: "session.update",
            session: {
              turn_detection: { type: "server_vad" },
              input_audio_format: "g711_ulaw",
              output_audio_format: "g711_ulaw",
              voice: "alloy",
              instructions: "You are a helpful and bubbly AI assistant. You are brief and to the point.",
              modalities: [ "text", "audio" ],
              temperature: 0.8
            }
          }
          openai_ws.send(session_update.to_json)
        end

        openai_ws.on :message do |event|
          # Handle incoming messages from OpenAI
          begin
            data = JSON.parse(event.data)
            case data["type"]
            when "response.audio.delta"
              if data["delta"]
                begin
                  # Process audio delta
                  audio_delta = {
                    event: "media",
                    streamSid: stream_sid,
                    media: {
                      payload: data["delta"]
                    }
                  }
                  # Send audio delta to Twilio
                  ws.send(audio_delta.to_json)
                rescue => e
                  puts "Error processing audio delta: #{e.message}"
                end
              end
            when "session.updated"
              puts "Session updated successfully: #{data}"
            when "input_audio_buffer.speech_started"
              puts "Speech Start: #{data['type']}"
              handle_speech_started_event(ws, openai_ws, stream_sid)
            end
          rescue => e
            puts "Error processing OpenAI message: #{e.message}, Raw message: #{event.data}"
          end
        end

        openai_ws.on :close do |event|
          puts "Disconnected from OpenAI Realtime API"
        end

        openai_ws.on :error do |event|
          puts "WebSocket error: #{event.message}"
        end

        # Handle incoming messages from Twilio
        ws.on :message do |event|
          data = JSON.parse(event.data)
          if data["event"] == "media"
            begin
              # Forward media to OpenAI
              audio_append = {
                type: "input_audio_buffer.append",
                audio: data["media"]["payload"]
              }
              openai_ws.send(audio_append.to_json) if openai_ws.ready_state == Faye::WebSocket::OPEN
            rescue => e
              puts "Error processing Twilio audio: #{e.message}"
            end
          elsif data["event"] == "start"
            stream_sid = data["start"]["streamSid"]
            puts "Incoming stream has started: #{stream_sid}"
          end
        end

        ws.on :close do |event|
          puts "Twilio client disconnected"
          openai_ws.close if openai_ws.ready_state == Faye::WebSocket::OPEN
        end
      end

      # Return async Rack response
      ws.rack_response
    else
      # Handle non-WebSocket requests
      render plain: "This endpoint is for WebSocket connections only."
    end
  end

  private

  def handle_speech_started_event(ws, openai_ws, stream_sid)
    if ws.ready_state == Faye::WebSocket::OPEN
      # Send a clear event to Twilio to clear the media buffer
      ws.send({ streamSid: stream_sid, event: "clear" }.to_json)
      puts "Cancelling AI speech from the server"
    end

    if openai_ws.ready_state == Faye::WebSocket::OPEN
      # Send a cancel message to OpenAI to interrupt the AI response
      interrupt_message = { type: "response.cancel" }
      openai_ws.send(interrupt_message.to_json)
    end
  end
end

@alexrudall
Copy link
Owner Author

Thanks so much for sharing @danielfriis ! super helpful.

@michelson
Copy link

simple example with Async websocket

 def ws_client
    require "async"
    require "async/http"
    require "async/websocket"

    url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"

    # Creating headers for the request
    headers = {
      "Authorization" => "Bearer #{ENV.fetch('OPENAI_API_KEY', nil)}",
      "OpenAI-Beta" => "realtime=v1"
    }

    Async do |task|
      endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)

      Async::WebSocket::Client.connect(endpoint, headers: headers) do |connection|
        input_task = task.async do
          while line = $stdin.gets

            text = {
              type: "response.create",
              response: {
                modalities: ["text"],
                instructions: "Please assist the user."
              }
            }
            message = Protocol::WebSocket::TextMessage.generate(text) # ({ text: line })
            message.send(connection)
            connection.flush
          end
        end

        puts "Connected..."
        while message = connection.read
          puts "> #{message.to_h}"
        end
      ensure
        input_task&.stop
      end
    end
  end

@franpb14
Copy link

franpb14 commented Nov 1, 2024

Hello! I'm interested in opening a PR. I have been working with the API using ruby on rails. What I have done (Briefly):

  1. I have created a simple service that, using faye/websocket and eventmachine, connects to OpenAI's WebSocket and keeps it alive.
  2. I have connected this websocket to the front using actioncable
  3. With JS I have sent the audios and handled the messages from openai.

My proposal:

Maybe the solution in this gem should be something like the first point, right? More or less it should have this functions:
client.real_time.connect
client.real_time.onmessage(block) => where the developer will pass how to handle the responses
client.real_time.session_update({ hash_with_options })
client.real_time.append_input_audio_buffer(Base64EncodedAudioData)
client.real_time.commit_input_audio_buffler
client.real_time.clear_input_audio_buffler
client.real_time.create_conversation({ hash_with_options })
client.real_time.truncate_conversation({ hash_with_options })
client.real_time.delete_conversation(item_id)
client.real_time.create_response({ hash_with_options })

Maybe we could have a client.realtime.event instead of a function for each event. Let me know what you think. I'm really looking forward to being able to contribute to this project.

@franpb14 franpb14 linked a pull request Nov 4, 2024 that will close this issue
3 tasks
@franpb14
Copy link

franpb14 commented Nov 4, 2024

Hello! I wanted to try a bit and I opened a small PR, perhaps we can iterate better over it 😄

@tbcooney
Copy link

Likewise, thanks for sharing this example @danielfriis! Have you had any success using this approach to invoke an outbound call and stream from a worker?

@danielfriis
Copy link

danielfriis commented Nov 29, 2024

@tbcooney Yes!

See below.

I renamed the incoming_call method in my previous example to connect_call and reused that for both outgoing and incoming calls.

Then initiating outgoing calls with the initiate_outgoing_call method below.

def initiate_outgoing_call
    account_sid = Rails.application.credentials.dig(:twilio, :account_sid)
    auth_token = Rails.application.credentials.dig(:twilio, :auth_token)
    client = Twilio::REST::Client.new(account_sid, auth_token)

    call = client.calls.create(
      method: "POST",
      url: "http://#{request.host_with_port}/connect-call",
      to: <NUMBER>,
      from: <TWILIO NUMBER>
    )

    render json: { message: "Call initiated", sid: call.sid }
rescue Twilio::REST::TwilioError => e
    render json: { error: e.message }, status: :unprocessable_entity
end

def connect_call
    response = Twilio::TwiML::VoiceResponse.new do |r|
      r.say(message: "Connecting to the AI voice assistant...")
      r.connect do |c|
        c.stream(url: "wss://#{request.host_with_port}/media-stream")
      end
    end
    render xml: response.to_s
end

@tbcooney
Copy link

tbcooney commented Dec 1, 2024

image

@drnic
Copy link
Contributor

drnic commented Dec 5, 2024

@jon-sully
Copy link

I've been mulling this over pretty intensely since Realtime was released and @tbcooney and I actually put heads together a couple of times to think through how to integrate Rails with RealTime.

First, the code samples here are fantastic and such a big help. Thank you all so much for sharing!

Second, given my business and domain, I'm primarily interested in the two-way connection between Twilio and Realtime to facilitate phone calls with AI and/or humans+AI (e.g. 3 way calls), but one thing at a time.

Third, I'm highly infrastructure and scalability conscious. I like to push long-running things into background jobs as much as possible.

The main hold-up I've been mulling over the last month or so is sort of about system coordination and scaling. If a Rails application simply wants to connect to the Realtime API and stream audio back and forth, doing this from a background job is fine. OpenAI wants its customers to setup a web socket consumer to connect to its WSS endpoint. That's a simple task to do from a background job and, as long as you're running some kind event machine to wait until the connection is dead (and handle all the events in the middle, like "new audio arrived" and "sending audio back"), no problem.

The challenge arises on the Twilio side of things. Twilio requires that customers run the WSS endpoint and Twilio sets up a consumer to connect to the endpoint. This is the exact opposite of OpenAI's setup. This poses a challenge to the idea of "doing everything in a (long-running) background job". Ideally, I'd have wanted each phone call to essentially be represented by a single background job that runs until the phone call completes. Since a background job, on Heroku at least, cannot be network-addressable (and even if they were, Sidekiq doesn't include any facilities for this sort of thing and I'd need to spin up my own little WSS server on each job..) — this fundamentally will not work.

After beating my head against that reality for a while, I think I've come to what I believe is the best choice. Twilio essentially requires us to run web-endpoints (@danielfriis' example above (def media_stream...)) for receiving the Twilio audio stream via web sockets, wherein, as part of that endpoint's processing, we setup the consumer WSS client over to OpenAI. This is all fine and does work.

Where this raised an eyebrow for me was around capacity and scaling — we work with finite resources! Each call we coordinate from Twilio this way will essentially setup a new background thread on our web-server, independent of Puma's typical request threads, which will do work (pushing audio packets back and forth between the streams, perhaps writing logs along the way, perhaps tracking Rails model objects etc., and maybe even needing to transcode audio packets?) totally outside of actual web requests that are still coming in. Now imagine you have fifty phone calls going at once. That could spell a real issue for your web containers / dynos!

In addition to saturating resources, I'm not sure how autoscaling would even work with this setup. In general, the best way to autoscale Rails is by queue time (with a tool like Judoscale), but what happens when your web dyno is bogging down with saturated CPU .. and no requests to get a request queue time from?! 😂 That example might be a little dramatic — I don't expect that a web dyno that's coordinating phone calls would get no web requests.. but I'm just not sure what to expect with this architecture. The Puma web threads may not be busy and may be able to take on new requests (which means 0 queue time) but I guess the CPU would constantly be context switching to those background call threads, meaning our Puma threads might just ultimately take longer to process web requests. So maybe we see our overall response time rise while our queue time actually stays at zero? I'm not sure. I don't want to find out 😆

My working idea is to spin up a second 'prod' app on a subdomain (or otherwise) which exists solely for these phone call threads. I rarely reach for the "run a second prod app instance" lever, but it feels like it might be the best option in this particular situation. Especially since Twilio's HTTP POST for incoming_call can hit the main app and the wss:// address we pass back can actually hit the clone app. How the clone app would scale, I'm still not sure.

@michelson
Copy link

@jon-sully I think putting the calls in a job queue is not the best as you will hit the queue concurrency very quick, for instance if you have concurrency set as five, then you can only do 5 concurrent calls.

I like the approach of using websocket connections and maybe offload things with async gem like requests to rag or other kind of time consuming. Also I wouldn't use threads as threads has a higher memory cost, we could implement this with fibers (with async gem)

in terms of scalability there is a nice article from evilmartians that implements anycable (go middleware to activejob) and the repo example:

https://github.com/anycable/twilio-ai-demo

@jon-sully
Copy link

Agreed, calls will absolutely saturate background job concurrency for a single instance, but I tend to think of background worker instances / containers as more easily scalable than web containers. If nothing else than for the fact that by wrapping calls in jobs we'd have a stable metric on when to scale up: job queue time. I don't think we have the parallel with web requests to web dynos where a call is passed off to a background thread (outside of puma) — all we'd be able to observe there is CPU saturation (I think?)

Personally I'd like to avoid running AnyCable. I've read up on it previously and it's a neat tool but it's a much larger endeavor and I don't want to run / maintain a whole second application (in Go, no less) just to coordinate my Realtime stuff going through my Rails apps. Additionally I think AnyCable may be susceptible to the same challenges at a different layer.

@jon-sully
Copy link

I don't think threads vs. fibers matters much for the sake of the discussion around how to scale — both will inevitably free the Puma thread back to Puma, meaning that we've got a headless background thread (or fiber) with which we are using system resources on but aren't representing in any kind of system health metric (other than, again, maybe, CPU saturation)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants