From 96cf48d893cb5aedc5ab88ae04a2ff5ba09b2959 Mon Sep 17 00:00:00 2001 From: Rodrigo Borba Date: Sun, 1 Mar 2026 02:04:55 -0300 Subject: [PATCH] feat(ai): implement dynamic debounce with sidekiq and concurrency locking with redis --- .../conversation/response_builder_job.rb | 91 +++++++++---------- .../open_ai_message_builder_service.rb | 3 + .../hook_execution_service.rb | 13 ++- progresso/solucao_debounce_concorrencia.md | 37 ++++++++ 4 files changed, 92 insertions(+), 52 deletions(-) create mode 100644 progresso/solucao_debounce_concorrencia.md diff --git a/enterprise/app/jobs/captain/conversation/response_builder_job.rb b/enterprise/app/jobs/captain/conversation/response_builder_job.rb index 0c0a0cc8e..0ceeb6be7 100644 --- a/enterprise/app/jobs/captain/conversation/response_builder_job.rb +++ b/enterprise/app/jobs/captain/conversation/response_builder_job.rb @@ -27,28 +27,40 @@ class Captain::Conversation::ResponseBuilderJob < ApplicationJob @inbox = conversation.inbox @assistant = assistant - # Cancel if there are newer messages after the provided message return if debounce_requested?(message) - # Simulate reading the message before starting to type - delay_before_typing(message) + with_concurrency_lock do + # Pre-typing phase: Wait 1 second before showing the typing indicator + sleep(1.0) + return if debounce_requested?(message) - # Re-check debounce to avoid race condition where another message arrived during reading sleep - return if debounce_requested?(message) - - # Trigger typing on before processing - simulate_typing('typing_on') - @start_time = Time.zone.now - - begin - execute_agent_response - ensure - simulate_typing('typing_off') + simulate_typing_and_execute end end private + def with_concurrency_lock + lock_key = "captain_response_lock_#{@conversation.id}" + return unless Rails.cache.write(lock_key, true, unless_exist: true, expires_in: 60.seconds) + + begin + yield + ensure + Rails.cache.delete(lock_key) + end + end + + def simulate_typing_and_execute + # Trigger typing on before processing + simulate_typing('typing_on') + @start_time = Time.zone.now + + execute_agent_response + ensure + simulate_typing('typing_off') + end + def execute_agent_response Current.executed_by = @assistant @@ -72,29 +84,14 @@ class Captain::Conversation::ResponseBuilderJob < ApplicationJob return false if message.blank? last_incoming = @conversation.messages.where(message_type: :incoming).last - last_incoming.present? && last_incoming.id != message.id - end - - def delay_before_typing(message) - return if message.blank? || message.content.blank? - - chars_count = message.content.to_s.length - configured_delay = @inbox.typing_delay.to_i - - # Simulate reading time proportional to configured delay. Max reading is ~40% of configured delay. - max_reading = configured_delay.positive? ? (configured_delay * 0.4) : 4.0 - min_reading = [1.0, max_reading].min - reading_time = (chars_count / 25.0).clamp(min_reading, max_reading) - - # Add jitter (randomness between 85% and 120%) - jitter = 0.85 + (rand * 0.35) - delay = reading_time * jitter - - Rails.logger.info( - "[CAPTAIN][ResponseBuilderJob] Simulating reading delay of #{delay.round(2)}s " \ - "for message of #{chars_count} chars" - ) - sleep(delay) + is_debounce = last_incoming.present? && last_incoming.id != message.id + if is_debounce + Rails.logger.info( + '[CAPTAIN][ResponseBuilderJob] Debounce requested! ' \ + "Current message ID: #{message.id}, Last incoming ID: #{last_incoming.id}" + ) + end + is_debounce end def simulate_typing(status) @@ -148,29 +145,25 @@ class Captain::Conversation::ResponseBuilderJob < ApplicationJob text = response_text.to_s chars_count = text.length punctuation_pauses = text.count(',.!?;:') - configured_delay = @inbox.typing_delay.to_i - # Modela tempo de digitação de forma mais humana no WhatsApp: - # - Velocidade média de digitação: ~15 a 20 caracteres por segundo - # - Pequenas pausas por pontuação + # Velocidade média de digitação: ~15 a 20 caracteres por segundo base_time = (chars_count / 15.0) + (punctuation_pauses * 0.25) - # Se configurado, max_typing é o valor escolhido, senão, 12s - max_typing = configured_delay.positive? ? configured_delay.to_f : 12.0 - min_delay = [2.0, max_typing].min - - # Adicionando uma ligeira variação humana + # Variação humana (jitter) jitter = 0.85 + (rand * 0.35) - target_delay = (base_time * jitter).clamp(min_delay, max_typing) + target_delay = (base_time * jitter).clamp(2.0, 15.0) elapsed_time = Time.zone.now - @start_time - remaining_delay = target_delay - elapsed_time + + # Para de digitar exatamente 1 segundo antes de disparar a mensagem final + # Limitamos para não ficar negativo se o processamento do LLM demorar mais do que a digitação calculada + remaining_delay = [target_delay - elapsed_time - 1.0, 0].max return unless remaining_delay.positive? Rails.logger.info( "[CAPTAIN][ResponseBuilderJob] Simulating typing delay of #{remaining_delay.round(2)}s " \ - "(target: #{target_delay.round(2)}s, total elapsed: #{elapsed_time.round(2)}s, configured_max: #{max_typing}s)" + "(target: #{target_delay.round(2)}s, total elapsed: #{elapsed_time.round(2)}s, stopping 1s early)" ) sleep(remaining_delay) end diff --git a/enterprise/app/services/captain/open_ai_message_builder_service.rb b/enterprise/app/services/captain/open_ai_message_builder_service.rb index e7e0432b9..d1417539a 100644 --- a/enterprise/app/services/captain/open_ai_message_builder_service.rb +++ b/enterprise/app/services/captain/open_ai_message_builder_service.rb @@ -91,6 +91,9 @@ class Captain::OpenAiMessageBuilderService audio_attachments.map do |attachment| result = Messages::AudioTranscriptionService.new(attachment).perform result[:success] ? result[:transcriptions] : '' + rescue StandardError => e + Rails.logger.error "[Captain::OpenAiMessageBuilderService] Failed to extract audio transcription: #{e.message}" + '' end.join end end diff --git a/enterprise/app/services/enterprise/message_templates/hook_execution_service.rb b/enterprise/app/services/enterprise/message_templates/hook_execution_service.rb index 3c9fa9105..a69a969ce 100644 --- a/enterprise/app/services/enterprise/message_templates/hook_execution_service.rb +++ b/enterprise/app/services/enterprise/message_templates/hook_execution_service.rb @@ -31,12 +31,19 @@ module Enterprise::MessageTemplates::HookExecutionService def schedule_captain_response job_args = [conversation, conversation.inbox.captain_assistant, message] + base_wait = conversation.inbox.typing_delay.to_i.seconds if message.attachments.blank? - Captain::Conversation::ResponseBuilderJob.perform_later(*job_args) + total_wait = base_wait + if total_wait.positive? + Captain::Conversation::ResponseBuilderJob.set(wait: total_wait).perform_later(*job_args) + else + Captain::Conversation::ResponseBuilderJob.perform_later(*job_args) + end else - wait_time = calculate_attachment_wait_time - Captain::Conversation::ResponseBuilderJob.set(wait: wait_time).perform_later(*job_args) + attachment_wait = calculate_attachment_wait_time + total_wait = base_wait + attachment_wait + Captain::Conversation::ResponseBuilderJob.set(wait: total_wait).perform_later(*job_args) end end diff --git a/progresso/solucao_debounce_concorrencia.md b/progresso/solucao_debounce_concorrencia.md new file mode 100644 index 000000000..2aeb85647 --- /dev/null +++ b/progresso/solucao_debounce_concorrencia.md @@ -0,0 +1,37 @@ +# Solução: Debounce Dinâmico + Trava de Concorrência da IA + +## Objetivo +Resolver chamadas duplicadas para o LLM e estabilizar o agrupamento de mensagens (debounce) não-bloqueante na geração de respostas da IA. + +## Contexto +O Chatwoot apresentava falhas de agrupamento quando as mensagens chegavam muito perto do final do debounce. O uso do `sleep 10s` direto no Worker amarrava as threads do sistema. E quando uma "condição de corrida" acontecia (uma MSG2 enviava o Job enquanto a MSG1 já estava conectada no OpenAI), o sistema gerava 2 respostas para a mesma conversa. + +## Passos Implementados + +1. **Agendamento no Sidekiq (Debounce Dinâmico)** + - Removemos o `sleep(10)` da Thread. + - Usamos o `perform_later(wait: X)` dentro do `HookExecutionService.schedule_captain_response`. + - Se uma MSG2 cai segundos depois, ela agenda o Job *mais para a frente*. Quando o Job1 acorda no passado, o método `debounce_requested?` nota que não é a mais recente e **aborta** (Early Return), delegando a responsabilidade para a MSG2 agendada. + +2. **Trava Distribuída (Mutex via `Rails.cache`)** + - Injetamos um "Cadeado" atômico que só permite a IA responder a uma conversa por vez. + - Antes de iniciar a digitação no Chat/WhatsApp e chamar o OpenAI, o Job trava a chave `captain_response_lock_ID` por `60.seconds`. + - Se outro Job (como o da MSG2) passar pelo relógio enquanto a primeira requisição anda na rede, ele baterá na trava e descartará rodar a IA 2 vezes, garantindo segurança de processamento. + - O `ensure` no final apaga a trava obrigatoriamente independente do desfecho do código. + +## Principais Arquivos Alterados + +- `enterprise/app/services/enterprise/message_templates/hook_execution_service.rb` +- `enterprise/app/jobs/captain/conversation/response_builder_job.rb` + +## Callbacks ou APIs Utilizadas +- `Rails.cache.write(unless_exist: true)`: Cache atômico que serve de Mutex. +- `Sidekiq::Worker.set(wait: X)`: ActiveJob Delay Queue. + +## Como Validar +Enviar N mensagens em uma mesma conversa pelo Whatsapp/Inbox em um curto período (Ex: Oi / Tudo bem? / Quanto custa?) dentro da janela de `typing_delay` de 10 segundos. +Apenas UMA requisição (a mais demorada) processará a IA após 10 segundos agrupando todo o histórico de uma vez, mantendo uma única e coesa mensagem final. + +## Como Reverter +- No `response_builder_job.rb`, remover o bloco envolto com `lock_key = "captain_response_loc..."` re-adicionando `sleep` hardcoded. +- E no `HookExecutionService` voltar a usar o `.perform_later` direto (sem `.set(wait)`).