# Pull Request Template ## Description Please include a summary of the change and issue(s) fixed. Also, mention relevant motivation, context, and any dependencies that this change requires. Fixes: The LLM call was wrapped in a transaction. This is an anti-pattern and caused idle-connections which PG eventually terminated with `PQconsumeInput() FATAL: terminating connection due to idle-in-transaction timeout` This resulted in activity messages being missing in some conversations on captain handoff, failures queueing up for retry and captain responding long after conversation was marked open/snoozed. ## Type of change - [x] Bug fix (non-breaking change which fixes an issue) ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration. locally and specs ## Checklist: - [x] My code follows the style guidelines of this project - [x] I have performed a self-review of my code - [x] I have commented on my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] Any dependent changes have been merged and published in downstream modules --------- Co-authored-by: Muhsin Keloth <muhsinkeramam@gmail.com>
148 lines
4.1 KiB
Ruby
148 lines
4.1 KiB
Ruby
class Captain::Conversation::ResponseBuilderJob < ApplicationJob
|
|
MAX_MESSAGE_LENGTH = 10_000
|
|
retry_on ActiveStorage::FileNotFoundError, attempts: 3, wait: 2.seconds
|
|
retry_on Faraday::BadRequestError, attempts: 3, wait: 2.seconds
|
|
|
|
def perform(conversation, assistant)
|
|
@conversation = conversation
|
|
@inbox = conversation.inbox
|
|
@assistant = assistant
|
|
|
|
Current.executed_by = @assistant
|
|
|
|
if captain_v2_enabled?
|
|
generate_response_with_v2
|
|
else
|
|
generate_and_process_response
|
|
end
|
|
rescue StandardError => e
|
|
raise e if e.is_a?(ActiveStorage::FileNotFoundError) || e.is_a?(Faraday::BadRequestError)
|
|
|
|
handle_error(e)
|
|
ensure
|
|
Current.executed_by = nil
|
|
end
|
|
|
|
private
|
|
|
|
delegate :account, :inbox, to: :@conversation
|
|
|
|
def generate_and_process_response
|
|
@response = Captain::Llm::AssistantChatService.new(assistant: @assistant, conversation_id: @conversation.display_id).generate_response(
|
|
message_history: collect_previous_messages
|
|
)
|
|
process_response
|
|
end
|
|
|
|
def generate_response_with_v2
|
|
@response = Captain::Assistant::AgentRunnerService.new(assistant: @assistant, conversation: @conversation).generate_response(
|
|
message_history: collect_previous_messages
|
|
)
|
|
process_response
|
|
end
|
|
|
|
def process_response
|
|
ActiveRecord::Base.transaction do
|
|
if handoff_requested?
|
|
process_action('handoff')
|
|
else
|
|
create_messages
|
|
Rails.logger.info("[CAPTAIN][ResponseBuilderJob] Incrementing response usage for #{account.id}")
|
|
account.increment_response_usage
|
|
end
|
|
end
|
|
end
|
|
|
|
def collect_previous_messages
|
|
@conversation
|
|
.messages
|
|
.where(message_type: [:incoming, :outgoing])
|
|
.where(private: false)
|
|
.map do |message|
|
|
message_hash = {
|
|
content: prepare_multimodal_message_content(message),
|
|
role: determine_role(message)
|
|
}
|
|
|
|
# Include agent_name if present in additional_attributes
|
|
message_hash[:agent_name] = message.additional_attributes['agent_name'] if message.additional_attributes&.dig('agent_name').present?
|
|
|
|
message_hash
|
|
end
|
|
end
|
|
|
|
def determine_role(message)
|
|
message.message_type == 'incoming' ? 'user' : 'assistant'
|
|
end
|
|
|
|
def prepare_multimodal_message_content(message)
|
|
Captain::OpenAiMessageBuilderService.new(message: message).generate_content
|
|
end
|
|
|
|
def handoff_requested?
|
|
@response['response'] == 'conversation_handoff'
|
|
end
|
|
|
|
def process_action(action)
|
|
case action
|
|
when 'handoff'
|
|
I18n.with_locale(@assistant.account.locale) do
|
|
create_handoff_message
|
|
@conversation.bot_handoff!
|
|
send_out_of_office_message_if_applicable
|
|
end
|
|
end
|
|
end
|
|
|
|
def send_out_of_office_message_if_applicable
|
|
# Campaign conversations should never receive OOO templates — the campaign itself
|
|
# serves as the initial outreach, and OOO would be confusing in that context.
|
|
return if @conversation.campaign.present?
|
|
|
|
::MessageTemplates::Template::OutOfOffice.perform_if_applicable(@conversation)
|
|
end
|
|
|
|
def create_handoff_message
|
|
create_outgoing_message(
|
|
@assistant.config['handoff_message'].presence || I18n.t('conversations.captain.handoff')
|
|
)
|
|
end
|
|
|
|
def create_messages
|
|
validate_message_content!(@response['response'])
|
|
create_outgoing_message(@response['response'], agent_name: @response['agent_name'])
|
|
end
|
|
|
|
def validate_message_content!(content)
|
|
raise ArgumentError, 'Message content cannot be blank' if content.blank?
|
|
end
|
|
|
|
def create_outgoing_message(message_content, agent_name: nil)
|
|
additional_attrs = {}
|
|
additional_attrs[:agent_name] = agent_name if agent_name.present?
|
|
|
|
@conversation.messages.create!(
|
|
message_type: :outgoing,
|
|
account_id: account.id,
|
|
inbox_id: inbox.id,
|
|
sender: @assistant,
|
|
content: message_content,
|
|
additional_attributes: additional_attrs
|
|
)
|
|
end
|
|
|
|
def handle_error(error)
|
|
log_error(error)
|
|
process_action('handoff')
|
|
true
|
|
end
|
|
|
|
def log_error(error)
|
|
ChatwootExceptionTracker.new(error, account: account).capture_exception
|
|
end
|
|
|
|
def captain_v2_enabled?
|
|
account.feature_enabled?('captain_integration_v2')
|
|
end
|
|
end
|