fix: resolve race condition on slow networks by re-checking message source_id after acquiring contact lock (#210)
This commit is contained in:
parent
42b2530a53
commit
c24eba9180
@ -1,6 +1,6 @@
|
||||
module BaileysHelper
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%<channel_id>s'.freeze
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 15.seconds
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 60.seconds
|
||||
|
||||
def baileys_extract_message_timestamp(timestamp)
|
||||
# NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true}
|
||||
@ -18,14 +18,21 @@ module BaileysHelper
|
||||
raise ArgumentError, 'A block is required for with_baileys_channel_lock_on_outgoing_message' unless block_given?
|
||||
|
||||
start_time = Time.now.to_i
|
||||
lock_acquired = false
|
||||
|
||||
# NOTE: On timeout, we ignore the lock and proceed with the block execution
|
||||
# NOTE: On timeout, we log a warning and proceed with the block execution.
|
||||
# The re-check inside the contact lock handles potential duplicates.
|
||||
while (Time.now.to_i - start_time) < timeout
|
||||
break if baileys_lock_channel_on_outgoing_message(channel_id, timeout)
|
||||
if baileys_lock_channel_on_outgoing_message(channel_id, timeout)
|
||||
lock_acquired = true
|
||||
break
|
||||
end
|
||||
|
||||
sleep(0.1)
|
||||
end
|
||||
|
||||
Rails.logger.warn "Baileys channel lock timeout for channel #{channel_id} after #{timeout}s - proceeding anyway" unless lock_acquired
|
||||
|
||||
yield
|
||||
ensure
|
||||
baileys_clear_channel_lock_on_outgoing_message(channel_id)
|
||||
|
||||
@ -113,7 +113,7 @@ class Conversation < ApplicationRecord
|
||||
has_many :notifications, as: :primary_actor, dependent: :destroy_async
|
||||
has_many :attachments, through: :messages
|
||||
has_many :reporting_events, dependent: :destroy_async
|
||||
has_many :scheduled_messages, dependent: :destroy_async
|
||||
has_many :scheduled_messages, dependent: :destroy
|
||||
|
||||
before_save :ensure_snooze_until_reset
|
||||
before_create :determine_conversation_status
|
||||
|
||||
@ -20,7 +20,7 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul
|
||||
end
|
||||
end
|
||||
|
||||
def handle_message # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
|
||||
def handle_message # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity,Metrics/MethodLength
|
||||
@lock_acquired = false
|
||||
|
||||
return unless %w[lid user].include?(jid_type)
|
||||
@ -35,6 +35,13 @@ module Whatsapp::BaileysHandlers::MessagesUpsert # rubocop:disable Metrics/Modul
|
||||
# from the same contact arrive simultaneously (e.g., WhatsApp albums).
|
||||
contact_phone = extract_from_jid(type: 'pn') || extract_from_jid(type: 'lid')
|
||||
with_contact_lock(contact_phone) do
|
||||
# Re-check after acquiring lock to handle race conditions where:
|
||||
# 1. An agent sends a message from Chatwoot (slow API call)
|
||||
# 2. WhatsApp sends webhook before source_id is saved
|
||||
# 3. Webhook handler times out waiting for channel lock and proceeds
|
||||
# 4. By now, source_id should be set, so we can find the message
|
||||
return if find_message_by_source_id(raw_message_id)
|
||||
|
||||
set_contact
|
||||
|
||||
unless @contact
|
||||
|
||||
@ -37,6 +37,10 @@ class Whatsapp::IncomingMessageBaseService
|
||||
# from the same contact arrive simultaneously (e.g., WhatsApp albums).
|
||||
contact_phone = @processed_params[:messages].first[:from]
|
||||
with_contact_lock(contact_phone) do
|
||||
# Re-check after acquiring lock to handle race conditions where an outgoing message
|
||||
# was sent from Chatwoot and the webhook arrived before source_id was saved
|
||||
return if find_message_by_source_id(@processed_params[:messages].first[:id])
|
||||
|
||||
set_contact
|
||||
return unless @contact
|
||||
|
||||
|
||||
@ -451,6 +451,38 @@ describe Whatsapp::IncomingMessageBaileysService do
|
||||
expect(messages).to eq([message])
|
||||
end
|
||||
|
||||
it 'does not create duplicate message when source_id is set after contact lock is acquired' do
|
||||
# This tests the race condition fix:
|
||||
# 1. Agent sends message from Chatwoot (message created without source_id)
|
||||
# 2. Webhook arrives before source_id is saved
|
||||
# 3. Webhook handler times out on channel lock and proceeds
|
||||
# 4. Inside contact lock, we re-check for message by source_id
|
||||
# 5. By then, source_id should be set, so duplicate is prevented
|
||||
|
||||
# Create contact and conversation that will be found
|
||||
contact = create(:contact, account: inbox.account, identifier: '12345678@lid', phone_number: '+5511912345678')
|
||||
contact_inbox = create(:contact_inbox, inbox: inbox, contact: contact, source_id: '12345678')
|
||||
conversation = create(:conversation, inbox: inbox, contact_inbox: contact_inbox, contact: contact)
|
||||
|
||||
# Simulate the race: message exists but will only be found on the re-check inside contact lock
|
||||
existing_message = create(:message, inbox: inbox, conversation: conversation)
|
||||
|
||||
# First call returns nil (simulating message not having source_id yet)
|
||||
# Second call (inside contact lock) returns the message
|
||||
service = described_class.new(inbox: inbox, params: params)
|
||||
call_count = 0
|
||||
allow(service).to receive(:find_message_by_source_id).and_wrap_original do |_method, _source_id|
|
||||
call_count += 1
|
||||
call_count == 1 ? nil : existing_message
|
||||
end
|
||||
|
||||
service.perform
|
||||
|
||||
# Should not create a new conversation or message
|
||||
expect(inbox.conversations.count).to eq(1)
|
||||
expect(inbox.messages.count).to eq(1)
|
||||
end
|
||||
|
||||
it 'does not create a message if it is already being processed' do
|
||||
# Simulate lock already acquired by returning false from SETNX
|
||||
allow(Redis::Alfred).to receive(:set)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user