diff --git a/app/services/whatsapp/baileys_handlers/messages_upsert.rb b/app/services/whatsapp/baileys_handlers/messages_upsert.rb index 64c2d4dfa..e54b4aff7 100644 --- a/app/services/whatsapp/baileys_handlers/messages_upsert.rb +++ b/app/services/whatsapp/baileys_handlers/messages_upsert.rb @@ -20,26 +20,33 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul end end - def handle_message + def handle_message # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity + @lock_acquired = false + return unless %w[lid user].include?(jid_type) return unless extract_from_jid(type: 'lid') return if ignore_message? return if find_message_by_source_id(raw_message_id) - return unless acquire_message_processing_lock + @lock_acquired = acquire_message_processing_lock + return unless @lock_acquired - set_contact + # Lock by contact phone to prevent race conditions when multiple messages + # from the same contact arrive simultaneously (e.g., WhatsApp albums). + contact_phone = extract_from_jid(type: 'pn') || extract_from_jid(type: 'lid') + with_contact_lock(contact_phone) do + set_contact - unless @contact - clear_message_source_id_from_redis + unless @contact + Rails.logger.warn "Contact not found for message: #{raw_message_id}" + return + end - Rails.logger.warn "Contact not found for message: #{raw_message_id}" - return + set_conversation + handle_create_message end - - set_conversation - handle_create_message - clear_message_source_id_from_redis + ensure + clear_message_source_id_from_redis if @lock_acquired end def set_contact diff --git a/app/services/whatsapp/incoming_message_base_service.rb b/app/services/whatsapp/incoming_message_base_service.rb index 4f60485e7..b0cf46592 100644 --- a/app/services/whatsapp/incoming_message_base_service.rb +++ b/app/services/whatsapp/incoming_message_base_service.rb @@ -19,6 +19,8 @@ class Whatsapp::IncomingMessageBaseService private def process_messages + @lock_acquired = false + # We don't support reactions & ephemeral message now, we need to skip processing the message # if the webhook event is a reaction or an ephermal message or an unsupported message. return if unprocessable_message_type?(message_type) @@ -28,16 +30,25 @@ class Whatsapp::IncomingMessageBaseService # there are no duplicate messages created. return if find_message_by_source_id(@processed_params[:messages].first[:id]) - return unless acquire_message_processing_lock + @lock_acquired = acquire_message_processing_lock + return unless @lock_acquired - set_contact - return unless @contact + # Lock by contact phone to prevent race conditions when multiple messages + # from the same contact arrive simultaneously (e.g., WhatsApp albums). + contact_phone = @processed_params[:messages].first[:from] + with_contact_lock(contact_phone) do + set_contact + return unless @contact - ActiveRecord::Base.transaction do - set_conversation - create_messages - clear_message_source_id_from_redis + ActiveRecord::Base.transaction do + set_conversation + create_messages + end end + ensure + # Clear lock AFTER transaction commits to prevent race conditions where another request + # acquires the lock before this transaction is visible to other connections + clear_message_source_id_from_redis if @lock_acquired end def process_statuses diff --git a/app/services/whatsapp/incoming_message_service_helpers.rb b/app/services/whatsapp/incoming_message_service_helpers.rb index 34d205bd5..dc9d67740 100644 --- a/app/services/whatsapp/incoming_message_service_helpers.rb +++ b/app/services/whatsapp/incoming_message_service_helpers.rb @@ -85,4 +85,31 @@ module Whatsapp::IncomingMessageServiceHelpers key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: "#{inbox.id}_#{@processed_params[:messages].first[:id]}") ::Redis::Alfred.delete(key) end + + # Lock by contact phone to prevent race conditions when multiple messages + # from the same contact arrive simultaneously (e.g., WhatsApp albums). + # Without this, each message could create its own conversation. + def with_contact_lock(phone, timeout: 5.seconds) + raise ArgumentError, 'A block is required for with_contact_lock' unless block_given? + return yield if phone.blank? + + key = "WHATSAPP::CONTACT_LOCK::#{inbox.id}_#{phone}" + start_time = Time.now.to_i + lock_acquired = false + + while (Time.now.to_i - start_time) < timeout + if Redis::Alfred.set(key, 1, nx: true, ex: timeout) + lock_acquired = true + break + end + + sleep(0.1) + end + + raise Timeout::Error, "Timeout acquiring contact lock for #{phone}" unless lock_acquired + + yield + ensure + Redis::Alfred.delete(key) if lock_acquired + end end diff --git a/spec/services/whatsapp/incoming_message_baileys_service_spec.rb b/spec/services/whatsapp/incoming_message_baileys_service_spec.rb index 6dd180e26..d727a1ee3 100644 --- a/spec/services/whatsapp/incoming_message_baileys_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_baileys_service_spec.rb @@ -463,6 +463,11 @@ describe Whatsapp::IncomingMessageBaileysService do end it 'caches and clears message source id in Redis' do + # Allow all Redis::Alfred calls (contact lock uses different keys) + allow(Redis::Alfred).to receive(:set).and_call_original + allow(Redis::Alfred).to receive(:delete).and_call_original + + # Stub message lock specifically allow(Redis::Alfred).to receive(:set) .with(format_message_source_key('msg_123'), true, nx: true, ex: 1.day) .and_return(true) @@ -470,8 +475,29 @@ describe Whatsapp::IncomingMessageBaileysService do described_class.new(inbox: inbox, params: params).perform - expect(Redis::Alfred).to have_received(:set) - expect(Redis::Alfred).to have_received(:delete) + expect(Redis::Alfred).to have_received(:set).with(format_message_source_key('msg_123'), true, nx: true, ex: 1.day) + expect(Redis::Alfred).to have_received(:delete).with(format_message_source_key('msg_123')) + end + + it 'clears lock even when an exception occurs after acquiring it' do + # Bug: no ensure block meant exceptions left lock stuck forever + # Fix: use ensure block to always clear lock when acquired + # Allow all Redis::Alfred calls (contact lock uses different keys) + allow(Redis::Alfred).to receive(:set).and_call_original + allow(Redis::Alfred).to receive(:delete).and_call_original + + # Stub message lock specifically + allow(Redis::Alfred).to receive(:set) + .with(format_message_source_key('msg_123'), true, nx: true, ex: 1.day) + .and_return(true) + allow(Redis::Alfred).to receive(:delete).with(format_message_source_key('msg_123')) + + service = described_class.new(inbox: inbox, params: params) + allow(service).to receive(:handle_create_message).and_raise(StandardError, 'simulated error') + + expect { service.perform }.to raise_error(StandardError, 'simulated error') + + expect(Redis::Alfred).to have_received(:delete).with(format_message_source_key('msg_123')) end end diff --git a/spec/services/whatsapp/incoming_message_service_spec.rb b/spec/services/whatsapp/incoming_message_service_spec.rb index 30fd9a489..a0821f308 100644 --- a/spec/services/whatsapp/incoming_message_service_spec.rb +++ b/spec/services/whatsapp/incoming_message_service_spec.rb @@ -144,49 +144,64 @@ describe Whatsapp::IncomingMessageService do expect(whatsapp_channel.inbox.messages.count).to eq(1) end - it 'prevents duplicate when both requests pass the message_under_process? check before cache is set' do - # This test explicitly simulates the race condition timing: - # 1. Request A calls message_under_process? -> returns nil (no lock) - # 2. Request B calls message_under_process? -> returns nil (no lock yet!) - # 3. Request A calls cache_message_source_id_in_redis - # 4. Request B calls cache_message_source_id_in_redis - # 5. Both requests pass find_message_by_source_id (message not yet committed) - # 6. Both create messages -> DUPLICATE! - # - # With atomic lock (SETNX), only one can succeed. - - # Key is scoped by inbox.id to prevent cross-inbox lock collisions + it 'only allows one of two concurrent requests to process the same message' do + # Simulates atomic SETNX: first caller gets the lock, second is rejected message_source_key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: "#{whatsapp_channel.inbox.id}_#{params[:messages].first[:id]}") lock_acquired = false - # Simulate atomic SETNX: only the first call returns true + # Allow all Redis::Alfred calls (contact lock uses different keys) + allow(Redis::Alfred).to receive(:set).and_call_original + allow(Redis::Alfred).to receive(:delete).and_call_original + + # Stub message lock specifically allow(Redis::Alfred).to receive(:set).with(message_source_key, true, nx: true, ex: 1.day) do - if lock_acquired - false # Second caller fails to acquire lock - else - lock_acquired = true - true # First caller acquires lock - end + result = !lock_acquired + lock_acquired = true + result end - allow(Redis::Alfred).to receive(:delete).with(message_source_key) - - # Run two services "simultaneously" (both pass the check before either sets the lock) service1 = described_class.new(inbox: whatsapp_channel.inbox, params: params) service2 = described_class.new(inbox: whatsapp_channel.inbox, params: params) - # Mock find_message_by_source_id on specific instances (simulating uncommitted transaction) + # Both bypass find_message_by_source_id (simulating race before DB commit) allow(service1).to receive(:find_message_by_source_id).and_return(nil) allow(service2).to receive(:find_message_by_source_id).and_return(nil) service1.perform service2.perform - # With atomic lock, only ONE should create a message expect(whatsapp_channel.inbox.messages.count).to eq(1) expect(whatsapp_channel.inbox.conversations.count).to eq(1) end + it 'clears lock outside transaction to prevent race conditions' do + # Bug: lock was cleared INSIDE transaction before commit, allowing: + # 1. Request A creates message, clears lock (transaction uncommitted) + # 2. Request B acquires lock, can't see A's uncommitted message + # 3. Both create duplicates + # + # Fix: clear lock in ensure block AFTER transaction commits + message_source_key = format(Redis::RedisKeys::MESSAGE_SOURCE_KEY, id: "#{whatsapp_channel.inbox.id}_#{params[:messages].first[:id]}") + lock_cleared_at_depth = nil + + # Allow all Redis::Alfred calls (contact lock uses different keys) + allow(Redis::Alfred).to receive(:set).and_call_original + allow(Redis::Alfred).to receive(:delete).and_call_original + + # Stub message lock specifically + allow(Redis::Alfred).to receive(:set).with(message_source_key, true, nx: true, ex: 1.day).and_return(true) + allow(Redis::Alfred).to receive(:delete).with(message_source_key) do + lock_cleared_at_depth = ActiveRecord::Base.connection.open_transactions + end + + described_class.new(inbox: whatsapp_channel.inbox, params: params).perform + + # Depth 1 = only RSpec's wrapper transaction, meaning our transaction completed + expect(lock_cleared_at_depth).to eq(1), + "Lock cleared at depth #{lock_cleared_at_depth}, expected 1. " \ + 'Lock must be cleared AFTER transaction commits to prevent duplicates.' + end + it 'creates message in second inbox when same source_id exists in different inbox' do # Create a message with same source_id in a different inbox other_whatsapp_channel = create(:channel_whatsapp, sync_templates: false) @@ -201,6 +216,21 @@ describe Whatsapp::IncomingMessageService do expect(whatsapp_channel.inbox.conversations.count).to eq(1) expect(whatsapp_channel.inbox.messages.count).to eq(1) end + + it 'acquires contact-level lock to prevent album race conditions' do + # When multiple messages from same contact arrive simultaneously (e.g., album), + # the contact lock prevents race conditions in conversation creation. + # This test verifies the lock is actually being acquired. + phone_number = '2423423243' + contact_lock_key = "WHATSAPP::CONTACT_LOCK::#{whatsapp_channel.inbox.id}_#{phone_number}" + + allow(Redis::Alfred).to receive(:set).and_call_original + allow(Redis::Alfred).to receive(:delete).and_call_original + + described_class.new(inbox: whatsapp_channel.inbox, params: params).perform + + expect(Redis::Alfred).to have_received(:set).with(contact_lock_key, 1, nx: true, ex: 5.seconds) + end end context 'when unsupported message types' do