feat(ai): implement dynamic debounce with sidekiq and concurrency locking with redis
This commit is contained in:
parent
5fdee0c7e6
commit
96cf48d893
@ -27,28 +27,40 @@ class Captain::Conversation::ResponseBuilderJob < ApplicationJob
|
||||
@inbox = conversation.inbox
|
||||
@assistant = assistant
|
||||
|
||||
# Cancel if there are newer messages after the provided message
|
||||
return if debounce_requested?(message)
|
||||
|
||||
# Simulate reading the message before starting to type
|
||||
delay_before_typing(message)
|
||||
with_concurrency_lock do
|
||||
# Pre-typing phase: Wait 1 second before showing the typing indicator
|
||||
sleep(1.0)
|
||||
return if debounce_requested?(message)
|
||||
|
||||
# Re-check debounce to avoid race condition where another message arrived during reading sleep
|
||||
return if debounce_requested?(message)
|
||||
|
||||
# Trigger typing on before processing
|
||||
simulate_typing('typing_on')
|
||||
@start_time = Time.zone.now
|
||||
|
||||
begin
|
||||
execute_agent_response
|
||||
ensure
|
||||
simulate_typing('typing_off')
|
||||
simulate_typing_and_execute
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def with_concurrency_lock
|
||||
lock_key = "captain_response_lock_#{@conversation.id}"
|
||||
return unless Rails.cache.write(lock_key, true, unless_exist: true, expires_in: 60.seconds)
|
||||
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
Rails.cache.delete(lock_key)
|
||||
end
|
||||
end
|
||||
|
||||
def simulate_typing_and_execute
|
||||
# Trigger typing on before processing
|
||||
simulate_typing('typing_on')
|
||||
@start_time = Time.zone.now
|
||||
|
||||
execute_agent_response
|
||||
ensure
|
||||
simulate_typing('typing_off')
|
||||
end
|
||||
|
||||
def execute_agent_response
|
||||
Current.executed_by = @assistant
|
||||
|
||||
@ -72,29 +84,14 @@ class Captain::Conversation::ResponseBuilderJob < ApplicationJob
|
||||
return false if message.blank?
|
||||
|
||||
last_incoming = @conversation.messages.where(message_type: :incoming).last
|
||||
last_incoming.present? && last_incoming.id != message.id
|
||||
end
|
||||
|
||||
def delay_before_typing(message)
|
||||
return if message.blank? || message.content.blank?
|
||||
|
||||
chars_count = message.content.to_s.length
|
||||
configured_delay = @inbox.typing_delay.to_i
|
||||
|
||||
# Simulate reading time proportional to configured delay. Max reading is ~40% of configured delay.
|
||||
max_reading = configured_delay.positive? ? (configured_delay * 0.4) : 4.0
|
||||
min_reading = [1.0, max_reading].min
|
||||
reading_time = (chars_count / 25.0).clamp(min_reading, max_reading)
|
||||
|
||||
# Add jitter (randomness between 85% and 120%)
|
||||
jitter = 0.85 + (rand * 0.35)
|
||||
delay = reading_time * jitter
|
||||
|
||||
Rails.logger.info(
|
||||
"[CAPTAIN][ResponseBuilderJob] Simulating reading delay of #{delay.round(2)}s " \
|
||||
"for message of #{chars_count} chars"
|
||||
)
|
||||
sleep(delay)
|
||||
is_debounce = last_incoming.present? && last_incoming.id != message.id
|
||||
if is_debounce
|
||||
Rails.logger.info(
|
||||
'[CAPTAIN][ResponseBuilderJob] Debounce requested! ' \
|
||||
"Current message ID: #{message.id}, Last incoming ID: #{last_incoming.id}"
|
||||
)
|
||||
end
|
||||
is_debounce
|
||||
end
|
||||
|
||||
def simulate_typing(status)
|
||||
@ -148,29 +145,25 @@ class Captain::Conversation::ResponseBuilderJob < ApplicationJob
|
||||
text = response_text.to_s
|
||||
chars_count = text.length
|
||||
punctuation_pauses = text.count(',.!?;:')
|
||||
configured_delay = @inbox.typing_delay.to_i
|
||||
|
||||
# Modela tempo de digitação de forma mais humana no WhatsApp:
|
||||
# - Velocidade média de digitação: ~15 a 20 caracteres por segundo
|
||||
# - Pequenas pausas por pontuação
|
||||
# Velocidade média de digitação: ~15 a 20 caracteres por segundo
|
||||
base_time = (chars_count / 15.0) + (punctuation_pauses * 0.25)
|
||||
|
||||
# Se configurado, max_typing é o valor escolhido, senão, 12s
|
||||
max_typing = configured_delay.positive? ? configured_delay.to_f : 12.0
|
||||
min_delay = [2.0, max_typing].min
|
||||
|
||||
# Adicionando uma ligeira variação humana
|
||||
# Variação humana (jitter)
|
||||
jitter = 0.85 + (rand * 0.35)
|
||||
target_delay = (base_time * jitter).clamp(min_delay, max_typing)
|
||||
target_delay = (base_time * jitter).clamp(2.0, 15.0)
|
||||
|
||||
elapsed_time = Time.zone.now - @start_time
|
||||
remaining_delay = target_delay - elapsed_time
|
||||
|
||||
# Para de digitar exatamente 1 segundo antes de disparar a mensagem final
|
||||
# Limitamos para não ficar negativo se o processamento do LLM demorar mais do que a digitação calculada
|
||||
remaining_delay = [target_delay - elapsed_time - 1.0, 0].max
|
||||
|
||||
return unless remaining_delay.positive?
|
||||
|
||||
Rails.logger.info(
|
||||
"[CAPTAIN][ResponseBuilderJob] Simulating typing delay of #{remaining_delay.round(2)}s " \
|
||||
"(target: #{target_delay.round(2)}s, total elapsed: #{elapsed_time.round(2)}s, configured_max: #{max_typing}s)"
|
||||
"(target: #{target_delay.round(2)}s, total elapsed: #{elapsed_time.round(2)}s, stopping 1s early)"
|
||||
)
|
||||
sleep(remaining_delay)
|
||||
end
|
||||
|
||||
@ -91,6 +91,9 @@ class Captain::OpenAiMessageBuilderService
|
||||
audio_attachments.map do |attachment|
|
||||
result = Messages::AudioTranscriptionService.new(attachment).perform
|
||||
result[:success] ? result[:transcriptions] : ''
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "[Captain::OpenAiMessageBuilderService] Failed to extract audio transcription: #{e.message}"
|
||||
''
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
@ -31,12 +31,19 @@ module Enterprise::MessageTemplates::HookExecutionService
|
||||
|
||||
def schedule_captain_response
|
||||
job_args = [conversation, conversation.inbox.captain_assistant, message]
|
||||
base_wait = conversation.inbox.typing_delay.to_i.seconds
|
||||
|
||||
if message.attachments.blank?
|
||||
Captain::Conversation::ResponseBuilderJob.perform_later(*job_args)
|
||||
total_wait = base_wait
|
||||
if total_wait.positive?
|
||||
Captain::Conversation::ResponseBuilderJob.set(wait: total_wait).perform_later(*job_args)
|
||||
else
|
||||
Captain::Conversation::ResponseBuilderJob.perform_later(*job_args)
|
||||
end
|
||||
else
|
||||
wait_time = calculate_attachment_wait_time
|
||||
Captain::Conversation::ResponseBuilderJob.set(wait: wait_time).perform_later(*job_args)
|
||||
attachment_wait = calculate_attachment_wait_time
|
||||
total_wait = base_wait + attachment_wait
|
||||
Captain::Conversation::ResponseBuilderJob.set(wait: total_wait).perform_later(*job_args)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
37
progresso/solucao_debounce_concorrencia.md
Normal file
37
progresso/solucao_debounce_concorrencia.md
Normal file
@ -0,0 +1,37 @@
|
||||
# Solução: Debounce Dinâmico + Trava de Concorrência da IA
|
||||
|
||||
## Objetivo
|
||||
Resolver chamadas duplicadas para o LLM e estabilizar o agrupamento de mensagens (debounce) não-bloqueante na geração de respostas da IA.
|
||||
|
||||
## Contexto
|
||||
O Chatwoot apresentava falhas de agrupamento quando as mensagens chegavam muito perto do final do debounce. O uso do `sleep 10s` direto no Worker amarrava as threads do sistema. E quando uma "condição de corrida" acontecia (uma MSG2 enviava o Job enquanto a MSG1 já estava conectada no OpenAI), o sistema gerava 2 respostas para a mesma conversa.
|
||||
|
||||
## Passos Implementados
|
||||
|
||||
1. **Agendamento no Sidekiq (Debounce Dinâmico)**
|
||||
- Removemos o `sleep(10)` da Thread.
|
||||
- Usamos o `perform_later(wait: X)` dentro do `HookExecutionService.schedule_captain_response`.
|
||||
- Se uma MSG2 cai segundos depois, ela agenda o Job *mais para a frente*. Quando o Job1 acorda no passado, o método `debounce_requested?` nota que não é a mais recente e **aborta** (Early Return), delegando a responsabilidade para a MSG2 agendada.
|
||||
|
||||
2. **Trava Distribuída (Mutex via `Rails.cache`)**
|
||||
- Injetamos um "Cadeado" atômico que só permite a IA responder a uma conversa por vez.
|
||||
- Antes de iniciar a digitação no Chat/WhatsApp e chamar o OpenAI, o Job trava a chave `captain_response_lock_ID` por `60.seconds`.
|
||||
- Se outro Job (como o da MSG2) passar pelo relógio enquanto a primeira requisição anda na rede, ele baterá na trava e descartará rodar a IA 2 vezes, garantindo segurança de processamento.
|
||||
- O `ensure` no final apaga a trava obrigatoriamente independente do desfecho do código.
|
||||
|
||||
## Principais Arquivos Alterados
|
||||
|
||||
- `enterprise/app/services/enterprise/message_templates/hook_execution_service.rb`
|
||||
- `enterprise/app/jobs/captain/conversation/response_builder_job.rb`
|
||||
|
||||
## Callbacks ou APIs Utilizadas
|
||||
- `Rails.cache.write(unless_exist: true)`: Cache atômico que serve de Mutex.
|
||||
- `Sidekiq::Worker.set(wait: X)`: ActiveJob Delay Queue.
|
||||
|
||||
## Como Validar
|
||||
Enviar N mensagens em uma mesma conversa pelo Whatsapp/Inbox em um curto período (Ex: Oi / Tudo bem? / Quanto custa?) dentro da janela de `typing_delay` de 10 segundos.
|
||||
Apenas UMA requisição (a mais demorada) processará a IA após 10 segundos agrupando todo o histórico de uma vez, mantendo uma única e coesa mensagem final.
|
||||
|
||||
## Como Reverter
|
||||
- No `response_builder_job.rb`, remover o bloco envolto com `lock_key = "captain_response_loc..."` re-adicionando `sleep` hardcoded.
|
||||
- E no `HookExecutionService` voltar a usar o `.perform_later` direto (sem `.set(wait)`).
|
||||
Loading…
Reference in New Issue
Block a user