From c24eba9180726c14889538ac6aa0702b936ca0cb Mon Sep 17 00:00:00 2001 From: Gabriel Jablonski Date: Thu, 5 Feb 2026 15:33:10 -0300 Subject: [PATCH] fix: resolve race condition on slow networks by re-checking message source_id after acquiring contact lock (#210) --- app/helpers/baileys_helper.rb | 13 ++++++-- app/models/conversation.rb | 2 +- .../baileys_handlers/messages_upsert.rb | 9 +++++- .../whatsapp/incoming_message_base_service.rb | 4 +++ .../incoming_message_baileys_service_spec.rb | 32 +++++++++++++++++++ 5 files changed, 55 insertions(+), 5 deletions(-) diff --git a/app/helpers/baileys_helper.rb b/app/helpers/baileys_helper.rb index 009b048d8..61fcf08b1 100644 --- a/app/helpers/baileys_helper.rb +++ b/app/helpers/baileys_helper.rb @@ -1,6 +1,6 @@ module BaileysHelper CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%s'.freeze - CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 15.seconds + CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 60.seconds def baileys_extract_message_timestamp(timestamp) # NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true} @@ -18,14 +18,21 @@ module BaileysHelper raise ArgumentError, 'A block is required for with_baileys_channel_lock_on_outgoing_message' unless block_given? start_time = Time.now.to_i + lock_acquired = false - # NOTE: On timeout, we ignore the lock and proceed with the block execution + # NOTE: On timeout, we log a warning and proceed with the block execution. + # The re-check inside the contact lock handles potential duplicates. while (Time.now.to_i - start_time) < timeout - break if baileys_lock_channel_on_outgoing_message(channel_id, timeout) + if baileys_lock_channel_on_outgoing_message(channel_id, timeout) + lock_acquired = true + break + end sleep(0.1) end + Rails.logger.warn "Baileys channel lock timeout for channel #{channel_id} after #{timeout}s - proceeding anyway" unless lock_acquired + yield ensure baileys_clear_channel_lock_on_outgoing_message(channel_id) diff --git a/app/models/conversation.rb b/app/models/conversation.rb index 4014f0c5e..c4642d7dc 100644 --- a/app/models/conversation.rb +++ b/app/models/conversation.rb @@ -113,7 +113,7 @@ class Conversation < ApplicationRecord has_many :notifications, as: :primary_actor, dependent: :destroy_async has_many :attachments, through: :messages has_many :reporting_events, dependent: :destroy_async - has_many :scheduled_messages, dependent: :destroy_async + has_many :scheduled_messages, dependent: :destroy before_save :ensure_snooze_until_reset before_create :determine_conversation_status diff --git a/app/services/whatsapp/baileys_handlers/messages_upsert.rb b/app/services/whatsapp/baileys_handlers/messages_upsert.rb index 96f5ecab9..a7e6ccf99 100644 --- a/app/services/whatsapp/baileys_handlers/messages_upsert.rb +++ b/app/services/whatsapp/baileys_handlers/messages_upsert.rb @@ -20,7 +20,7 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul end end - def handle_message # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity + def handle_message # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity,Metrics/MethodLength @lock_acquired = false return unless %w[lid user].include?(jid_type) @@ -35,6 +35,13 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul # from the same contact arrive simultaneously (e.g., WhatsApp albums). contact_phone = extract_from_jid(type: 'pn') || extract_from_jid(type: 'lid') with_contact_lock(contact_phone) do + # Re-check after acquiring lock to handle race conditions where: + # 1. An agent sends a message from Chatwoot (slow API call) + # 2. WhatsApp sends webhook before source_id is saved + # 3. Webhook handler times out waiting for channel lock and proceeds + # 4. By now, source_id should be set, so we can find the message + return if find_message_by_source_id(raw_message_id) + set_contact unless @contact diff --git a/app/services/whatsapp/incoming_message_base_service.rb b/app/services/whatsapp/incoming_message_base_service.rb index 5629342ca..a79b3c43b 100644 --- a/app/services/whatsapp/incoming_message_base_service.rb +++ b/app/services/whatsapp/incoming_message_base_service.rb @@ -37,6 +37,10 @@ class Whatsapp::IncomingMessageBaseService # from the same contact arrive simultaneously (e.g., WhatsApp albums). contact_phone = @processed_params[:messages].first[:from] with_contact_lock(contact_phone) do + # Re-check after acquiring lock to handle race conditions where an outgoing message + # was sent from Chatwoot and the webhook arrived before source_id was saved + return if find_message_by_source_id(@processed_params[:messages].first[:id]) + set_contact return unless @contact diff --git a/spec/services/whatsapp/incoming_message_baileys_service_spec.rb b/spec/services/whatsapp/incoming_message_baileys_service_spec.rb index d727a1ee3..fff1f7b52 100644 --- a/spec/services/whatsapp/incoming_message_baileys_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_baileys_service_spec.rb @@ -451,6 +451,38 @@ describe Whatsapp::IncomingMessageBaileysService do expect(messages).to eq([message]) end + it 'does not create duplicate message when source_id is set after contact lock is acquired' do + # This tests the race condition fix: + # 1. Agent sends message from Chatwoot (message created without source_id) + # 2. Webhook arrives before source_id is saved + # 3. Webhook handler times out on channel lock and proceeds + # 4. Inside contact lock, we re-check for message by source_id + # 5. By then, source_id should be set, so duplicate is prevented + + # Create contact and conversation that will be found + contact = create(:contact, account: inbox.account, identifier: '12345678@lid', phone_number: '+5511912345678') + contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: '12345678') + conversation = create(:conversation, inbox: inbox, contact_inbox: contact_inbox, contact: contact) + + # Simulate the race: message exists but will only be found on the re-check inside contact lock + existing_message = create(:message, inbox: inbox, conversation: conversation) + + # First call returns nil (simulating message not having source_id yet) + # Second call (inside contact lock) returns the message + service = described_class.new(inbox: inbox, params: params) + call_count = 0 + allow(service).to receive(:find_message_by_source_id).and_wrap_original do |_method, _source_id| + call_count += 1 + call_count == 1 ? nil : existing_message + end + + service.perform + + # Should not create a new conversation or message + expect(inbox.conversations.count).to eq(1) + expect(inbox.messages.count).to eq(1) + end + it 'does not create a message if it is already being processed' do # Simulate lock already acquired by returning false from SETNX allow(Redis::Alfred).to receive(:set)