From 8cf6e8907f26bc120be34b2109e7cfa657c0de08 Mon Sep 17 00:00:00 2001 From: Gabriel Jablonski Date: Wed, 8 Apr 2026 20:52:26 -0300 Subject: [PATCH] release v4.12.0-fazer-ai.47 (#259) * fix(whatsapp): add idempotent message sending to prevent duplicates on timeout retry When sending media messages via Baileys, Net::ReadTimeout causes Sidekiq to retry the job, potentially sending the same message multiple times. This adds a chatwootMessageId parameter to the Baileys API request, enabling server-side deduplication via Redis. Also increases HTTP timeout to 120s and channel lock to 130s to reduce false timeouts. Co-Authored-By: Claude Opus 4.6 (1M context) * fix: address review feedback - Use error.class.name assertions for parallel/reloading safety - Assert reconnect endpoint was not called on 409 (stronger assertion) * fix: address review feedback (round 2) - Only release channel lock in ensure if it was actually acquired (prevents clearing another worker's lock on timeout) - Assert chatwootMessageId in reproduction spec body matcher --------- Co-authored-by: Claude Opus 4.6 (1M context) --- app/helpers/baileys_helper.rb | 4 +- .../providers/whatsapp_baileys_service.rb | 10 +- spec/helpers/baileys_helper_spec.rb | 4 +- .../whatsapp_baileys_service_spec.rb | 281 ++++++++++-------- .../whatsapp/send_on_whatsapp_service_spec.rb | 34 +++ 5 files changed, 197 insertions(+), 136 deletions(-) diff --git a/app/helpers/baileys_helper.rb b/app/helpers/baileys_helper.rb index 61fcf08b1..f3b5b1b76 100644 --- a/app/helpers/baileys_helper.rb +++ b/app/helpers/baileys_helper.rb @@ -1,6 +1,6 @@ module BaileysHelper CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%s'.freeze - CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 60.seconds + CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 130.seconds def baileys_extract_message_timestamp(timestamp) # NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true} @@ -35,7 +35,7 @@ module BaileysHelper yield ensure - baileys_clear_channel_lock_on_outgoing_message(channel_id) + baileys_clear_channel_lock_on_outgoing_message(channel_id) if lock_acquired end private diff --git a/app/services/whatsapp/providers/whatsapp_baileys_service.rb b/app/services/whatsapp/providers/whatsapp_baileys_service.rb index 88fe3f82b..1505113fc 100644 --- a/app/services/whatsapp/providers/whatsapp_baileys_service.rb +++ b/app/services/whatsapp/providers/whatsapp_baileys_service.rb @@ -4,6 +4,7 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer class MessageContentTypeNotSupported < StandardError; end class ProviderUnavailableError < StandardError; end class GroupParticipantNotAllowedError < StandardError; end + class MessageAlreadyProcessingError < StandardError; end DEFAULT_CLIENT_NAME = ENV.fetch('BAILEYS_PROVIDER_DEFAULT_CLIENT_NAME', nil) DEFAULT_URL = ENV.fetch('BAILEYS_PROVIDER_DEFAULT_URL', nil) @@ -567,10 +568,13 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer headers: api_headers, body: { jid: remote_jid, - messageContent: @message_content - }.to_json + messageContent: @message_content, + chatwootMessageId: @message.id + }.to_json, + timeout: 120 ) + raise MessageAlreadyProcessingError if response.code == 409 raise ProviderUnavailableError unless process_response(response) update_external_created_at(response) @@ -816,6 +820,8 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer define_method(method_name) do |*args, **kwargs, &block| original_method.bind_call(self, *args, **kwargs, &block) + rescue MessageAlreadyProcessingError + raise rescue StandardError => e handle_channel_error raise e diff --git a/spec/helpers/baileys_helper_spec.rb b/spec/helpers/baileys_helper_spec.rb index 36cce07ef..6f4634df1 100644 --- a/spec/helpers/baileys_helper_spec.rb +++ b/spec/helpers/baileys_helper_spec.rb @@ -74,7 +74,7 @@ RSpec.describe BaileysHelper do end context 'when lock is not acquired within timeout' do - it 'still executes the block and clears the lock' do + it 'still executes the block but does not clear the lock' do freeze_time allow(Redis::Alfred).to receive(:set).and_return(false) allow(self).to receive(:sleep) { travel_to 1.second.from_now } @@ -83,7 +83,7 @@ RSpec.describe BaileysHelper do expect(Redis::Alfred).to have_received(:set) .with(lock_key, 1, nx: true, ex: timeout) .exactly(BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT.to_i) - expect(Redis::Alfred).to have_received(:delete).once.with(lock_key) + expect(Redis::Alfred).not_to have_received(:delete) end end end diff --git a/spec/services/whatsapp/providers/whatsapp_baileys_service_spec.rb b/spec/services/whatsapp/providers/whatsapp_baileys_service_spec.rb index 97a2006e6..ed235c3c5 100644 --- a/spec/services/whatsapp/providers/whatsapp_baileys_service_spec.rb +++ b/spec/services/whatsapp/providers/whatsapp_baileys_service_spec.rb @@ -205,10 +205,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { text: 'Please rate us http://example.com/survey' } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { text: 'Please rate us http://example.com/survey' } + }) ) .to_return( status: 200, @@ -240,10 +240,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { fileName: 'image.png', caption: message.content, image: base64_image } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { fileName: 'image.png', caption: message.content, image: base64_image } + }) ) .to_return( status: 200, @@ -261,10 +261,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { fileName: 'image.png', image: base64_image } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { fileName: 'image.png', image: base64_image } + }) ) .to_return( status: 200, @@ -296,10 +296,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio } + }) ) .to_return( status: 200, @@ -318,10 +318,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio, ptt: true } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio, ptt: true } + }) ) .to_return( status: 200, @@ -340,10 +340,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { text: message.content } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { text: message.content } + }) ) .to_return( status: 200, @@ -360,10 +360,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { text: message.content } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { text: message.content } + }) ) .to_return( status: 200, @@ -393,13 +393,13 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { react: { key: { id: message.source_id, - remoteJid: test_send_jid, - fromMe: true }, - text: '👍' } } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { react: { key: { id: message.source_id, + remoteJid: test_send_jid, + fromMe: true }, + text: '👍' } } + }, reaction) ) .to_return( status: 200, @@ -419,13 +419,13 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { react: { key: { id: message.source_id, - remoteJid: test_send_jid, - fromMe: false }, - text: '👍' } } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { react: { key: { id: message.source_id, + remoteJid: test_send_jid, + fromMe: false }, + text: '👍' } } + }, reaction) ) .to_return( status: 200, @@ -457,20 +457,20 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { - text: 'Reply text', - quotedMessage: { - key: { - id: 'original_msg_123', - remoteJid: test_send_jid, - fromMe: true - }, - message: { conversation: 'Original text' } - } - } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { + text: 'Reply text', + quotedMessage: { + key: { + id: 'original_msg_123', + remoteJid: test_send_jid, + fromMe: true + }, + message: { conversation: 'Original text' } + } + } + }, reply_message) ) .to_return( status: 200, @@ -492,20 +492,20 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { - text: 'Reply to incoming', - quotedMessage: { - key: { - id: 'incoming_msg_456', - remoteJid: test_send_jid, - fromMe: false - }, - message: { conversation: 'Incoming text' } - } - } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { + text: 'Reply to incoming', + quotedMessage: { + key: { + id: 'incoming_msg_456', + remoteJid: test_send_jid, + fromMe: false + }, + message: { conversation: 'Incoming text' } + } + } + }, reply_message) ) .to_return( status: 200, @@ -530,20 +530,20 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { - text: 'Nice image!', - quotedMessage: { - key: { - id: 'image_msg_789', - remoteJid: test_send_jid, - fromMe: false - }, - message: { imageMessage: { caption: 'Check this image' } } - } - } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { + text: 'Nice image!', + quotedMessage: { + key: { + id: 'image_msg_789', + remoteJid: test_send_jid, + fromMe: false + }, + message: { imageMessage: { caption: 'Check this image' } } + } + } + }, reply_message) ) .to_return( status: 200, @@ -562,10 +562,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: test_send_jid, - messageContent: { text: 'Regular message' } - }.to_json + body: send_message_body({ + jid: test_send_jid, + messageContent: { text: 'Regular message' } + }, regular_message) ) .to_return( status: 200, @@ -586,10 +586,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, request_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: group_jid, - messageContent: { text: message.content } - }.to_json + body: send_message_body({ + jid: group_jid, + messageContent: { text: message.content } + }) ) .to_return( status: 200, @@ -621,6 +621,23 @@ describe Whatsapp::Providers::WhatsappBaileysService do end.to raise_error(Whatsapp::Providers::WhatsappBaileysService::ProviderUnavailableError) end end + + context 'when server returns 409 (message already processing)' do + it 'raises MessageAlreadyProcessingError without triggering channel reconnection' do + stub_request(:post, request_path) + .to_return(status: 409, body: 'Message is already being processed') + + setup_url = "#{whatsapp_channel.provider_config['provider_url']}/connections/#{whatsapp_channel.phone_number}" + + expect do + service.send_message(test_send_phone_number, message) + end.to(raise_error do |error| + expect(error.class.name).to eq('Whatsapp::Providers::WhatsappBaileysService::MessageAlreadyProcessingError') + end) + + expect(WebMock).not_to have_requested(:post, setup_url) + end + end end describe '#media_url' do @@ -1104,16 +1121,16 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, send_message_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: group_jid, - messageContent: { - text: 'World!', - quotedMessage: { - key: { id: 'incoming_group_msg', remoteJid: group_jid, fromMe: false, participant: participant_lid }, - message: { conversation: 'Hello' } - } - } - }.to_json + body: send_message_body({ + jid: group_jid, + messageContent: { + text: 'World!', + quotedMessage: { + key: { id: 'incoming_group_msg', remoteJid: group_jid, fromMe: false, participant: participant_lid }, + message: { conversation: 'Hello' } + } + } + }, reply_message) ) .to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json) @@ -1131,16 +1148,16 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, send_message_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: group_jid, - messageContent: { - text: 'World!', - quotedMessage: { - key: { id: 'outgoing_group_msg', remoteJid: group_jid, fromMe: true }, - message: { conversation: 'Hello' } - } - } - }.to_json + body: send_message_body({ + jid: group_jid, + messageContent: { + text: 'World!', + quotedMessage: { + key: { id: 'outgoing_group_msg', remoteJid: group_jid, fromMe: true }, + message: { conversation: 'Hello' } + } + } + }, reply_message) ) .to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json) @@ -1157,15 +1174,15 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, send_message_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: group_jid, - messageContent: { - react: { - key: { id: original_message.source_id, remoteJid: group_jid, fromMe: false, participant: participant_lid }, - text: '👍' - } - } - }.to_json + body: send_message_body({ + jid: group_jid, + messageContent: { + react: { + key: { id: original_message.source_id, remoteJid: group_jid, fromMe: false, participant: participant_lid }, + text: '👍' + } + } + }, reaction) ) .to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json) @@ -1182,15 +1199,15 @@ describe Whatsapp::Providers::WhatsappBaileysService do stub_request(:post, send_message_path) .with( headers: stub_headers(whatsapp_channel), - body: { - jid: group_jid, - messageContent: { - react: { - key: { id: original_message.source_id, remoteJid: group_jid, fromMe: true }, - text: '❤️' - } - } - }.to_json + body: send_message_body({ + jid: group_jid, + messageContent: { + react: { + key: { id: original_message.source_id, remoteJid: group_jid, fromMe: true }, + text: '❤️' + } + } + }, reaction) ) .to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json) @@ -1801,4 +1818,8 @@ describe Whatsapp::Providers::WhatsappBaileysService do 'x-api-key' => channel.provider_config['api_key'] } end + + def send_message_body(hash, msg = message) + hash.merge(chatwootMessageId: msg.id).to_json + end end diff --git a/spec/services/whatsapp/send_on_whatsapp_service_spec.rb b/spec/services/whatsapp/send_on_whatsapp_service_spec.rb index 8a41a97a3..5d0d826fc 100644 --- a/spec/services/whatsapp/send_on_whatsapp_service_spec.rb +++ b/spec/services/whatsapp/send_on_whatsapp_service_spec.rb @@ -432,6 +432,40 @@ describe Whatsapp::SendOnWhatsappService do expect(message.reload.source_id).to eq('msg_group') end + + describe 'duplicate send on Net::ReadTimeout retry' do + let(:send_message_url) do + "#{whatsapp_channel.provider_config['provider_url']}/connections/#{whatsapp_channel.phone_number}/send-message" + end + let(:setup_url) do + "#{whatsapp_channel.provider_config['provider_url']}/connections/#{whatsapp_channel.phone_number}" + end + let(:success_body) { { data: { key: { id: 'wa_msg_123' }, messageTimestamp: '123' } }.to_json } + + before do + conversation.contact.update!(phone_number: '+123456789') + create(:message, message_type: :incoming, content: 'hi', conversation: conversation) + stub_request(:post, setup_url).to_return(status: 200, body: '', headers: {}) + end + + it 'sends the message twice when first attempt times out and job is retried' do + message = create(:message, message_type: :outgoing, content: 'test', conversation: conversation, source_id: nil) + + stub = stub_request(:post, send_message_url) + .with { |req| JSON.parse(req.body)['chatwootMessageId'] == message.id } + .to_raise(Net::ReadTimeout.new('Net::ReadTimeout')) + .then + .to_return(status: 200, body: success_body, headers: { 'Content-Type' => 'application/json' }) + + expect { SendReplyJob.perform_now(message.id) }.to(raise_error { |e| expect(e.class.name).to eq('Net::ReadTimeout') }) + expect(message.reload.source_id).to be_nil + + SendReplyJob.perform_now(message.id) + expect(message.reload.source_id).to eq('wa_msg_123') + + expect(stub).to have_been_requested.twice + end + end end context 'when provider is zapi' do