release v4.12.0-fazer-ai.47 (#259)
* fix(whatsapp): add idempotent message sending to prevent duplicates on timeout retry When sending media messages via Baileys, Net::ReadTimeout causes Sidekiq to retry the job, potentially sending the same message multiple times. This adds a chatwootMessageId parameter to the Baileys API request, enabling server-side deduplication via Redis. Also increases HTTP timeout to 120s and channel lock to 130s to reduce false timeouts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address review feedback - Use error.class.name assertions for parallel/reloading safety - Assert reconnect endpoint was not called on 409 (stronger assertion) * fix: address review feedback (round 2) - Only release channel lock in ensure if it was actually acquired (prevents clearing another worker's lock on timeout) - Assert chatwootMessageId in reproduction spec body matcher --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
dba5379d5e
commit
8cf6e8907f
@ -1,6 +1,6 @@
|
||||
module BaileysHelper
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_KEY = 'BAILEYS::CHANNEL_LOCK_ON_OUTGOING_MESSAGE::%<channel_id>s'.freeze
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 60.seconds
|
||||
CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT = 130.seconds
|
||||
|
||||
def baileys_extract_message_timestamp(timestamp)
|
||||
# NOTE: Timestamp might be in this format {"low"=>1748003165, "high"=>0, "unsigned"=>true}
|
||||
@ -35,7 +35,7 @@ module BaileysHelper
|
||||
|
||||
yield
|
||||
ensure
|
||||
baileys_clear_channel_lock_on_outgoing_message(channel_id)
|
||||
baileys_clear_channel_lock_on_outgoing_message(channel_id) if lock_acquired
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
@ -4,6 +4,7 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer
|
||||
class MessageContentTypeNotSupported < StandardError; end
|
||||
class ProviderUnavailableError < StandardError; end
|
||||
class GroupParticipantNotAllowedError < StandardError; end
|
||||
class MessageAlreadyProcessingError < StandardError; end
|
||||
|
||||
DEFAULT_CLIENT_NAME = ENV.fetch('BAILEYS_PROVIDER_DEFAULT_CLIENT_NAME', nil)
|
||||
DEFAULT_URL = ENV.fetch('BAILEYS_PROVIDER_DEFAULT_URL', nil)
|
||||
@ -567,10 +568,13 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer
|
||||
headers: api_headers,
|
||||
body: {
|
||||
jid: remote_jid,
|
||||
messageContent: @message_content
|
||||
}.to_json
|
||||
messageContent: @message_content,
|
||||
chatwootMessageId: @message.id
|
||||
}.to_json,
|
||||
timeout: 120
|
||||
)
|
||||
|
||||
raise MessageAlreadyProcessingError if response.code == 409
|
||||
raise ProviderUnavailableError unless process_response(response)
|
||||
|
||||
update_external_created_at(response)
|
||||
@ -816,6 +820,8 @@ class Whatsapp::Providers::WhatsappBaileysService < Whatsapp::Providers::BaseSer
|
||||
|
||||
define_method(method_name) do |*args, **kwargs, &block|
|
||||
original_method.bind_call(self, *args, **kwargs, &block)
|
||||
rescue MessageAlreadyProcessingError
|
||||
raise
|
||||
rescue StandardError => e
|
||||
handle_channel_error
|
||||
raise e
|
||||
|
||||
@ -74,7 +74,7 @@ RSpec.describe BaileysHelper do
|
||||
end
|
||||
|
||||
context 'when lock is not acquired within timeout' do
|
||||
it 'still executes the block and clears the lock' do
|
||||
it 'still executes the block but does not clear the lock' do
|
||||
freeze_time
|
||||
allow(Redis::Alfred).to receive(:set).and_return(false)
|
||||
allow(self).to receive(:sleep) { travel_to 1.second.from_now }
|
||||
@ -83,7 +83,7 @@ RSpec.describe BaileysHelper do
|
||||
expect(Redis::Alfred).to have_received(:set)
|
||||
.with(lock_key, 1, nx: true, ex: timeout)
|
||||
.exactly(BaileysHelper::CHANNEL_LOCK_ON_OUTGOING_MESSAGE_TIMEOUT.to_i)
|
||||
expect(Redis::Alfred).to have_received(:delete).once.with(lock_key)
|
||||
expect(Redis::Alfred).not_to have_received(:delete)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -205,10 +205,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: 'Please rate us http://example.com/survey' }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: 'Please rate us http://example.com/survey' }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -240,10 +240,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'image.png', caption: message.content, image: base64_image }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'image.png', caption: message.content, image: base64_image }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -261,10 +261,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'image.png', image: base64_image }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'image.png', image: base64_image }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -296,10 +296,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -318,10 +318,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio, ptt: true }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { fileName: 'audio.wav', caption: message.content, audio: base64_audio, ptt: true }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -340,10 +340,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: message.content }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: message.content }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -360,10 +360,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: message.content }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: message.content }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -393,13 +393,13 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { react: { key: { id: message.source_id,
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: true },
|
||||
text: '👍' } }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { react: { key: { id: message.source_id,
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: true },
|
||||
text: '👍' } }
|
||||
}, reaction)
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -419,13 +419,13 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { react: { key: { id: message.source_id,
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: false },
|
||||
text: '👍' } }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { react: { key: { id: message.source_id,
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: false },
|
||||
text: '👍' } }
|
||||
}, reaction)
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -457,20 +457,20 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: {
|
||||
text: 'Reply text',
|
||||
quotedMessage: {
|
||||
key: {
|
||||
id: 'original_msg_123',
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: true
|
||||
},
|
||||
message: { conversation: 'Original text' }
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: {
|
||||
text: 'Reply text',
|
||||
quotedMessage: {
|
||||
key: {
|
||||
id: 'original_msg_123',
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: true
|
||||
},
|
||||
message: { conversation: 'Original text' }
|
||||
}
|
||||
}
|
||||
}, reply_message)
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -492,20 +492,20 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: {
|
||||
text: 'Reply to incoming',
|
||||
quotedMessage: {
|
||||
key: {
|
||||
id: 'incoming_msg_456',
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: false
|
||||
},
|
||||
message: { conversation: 'Incoming text' }
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: {
|
||||
text: 'Reply to incoming',
|
||||
quotedMessage: {
|
||||
key: {
|
||||
id: 'incoming_msg_456',
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: false
|
||||
},
|
||||
message: { conversation: 'Incoming text' }
|
||||
}
|
||||
}
|
||||
}, reply_message)
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -530,20 +530,20 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: {
|
||||
text: 'Nice image!',
|
||||
quotedMessage: {
|
||||
key: {
|
||||
id: 'image_msg_789',
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: false
|
||||
},
|
||||
message: { imageMessage: { caption: 'Check this image' } }
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: {
|
||||
text: 'Nice image!',
|
||||
quotedMessage: {
|
||||
key: {
|
||||
id: 'image_msg_789',
|
||||
remoteJid: test_send_jid,
|
||||
fromMe: false
|
||||
},
|
||||
message: { imageMessage: { caption: 'Check this image' } }
|
||||
}
|
||||
}
|
||||
}, reply_message)
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -562,10 +562,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: 'Regular message' }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: test_send_jid,
|
||||
messageContent: { text: 'Regular message' }
|
||||
}, regular_message)
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -586,10 +586,10 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, request_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: group_jid,
|
||||
messageContent: { text: message.content }
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: group_jid,
|
||||
messageContent: { text: message.content }
|
||||
})
|
||||
)
|
||||
.to_return(
|
||||
status: 200,
|
||||
@ -621,6 +621,23 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
end.to raise_error(Whatsapp::Providers::WhatsappBaileysService::ProviderUnavailableError)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when server returns 409 (message already processing)' do
|
||||
it 'raises MessageAlreadyProcessingError without triggering channel reconnection' do
|
||||
stub_request(:post, request_path)
|
||||
.to_return(status: 409, body: 'Message is already being processed')
|
||||
|
||||
setup_url = "#{whatsapp_channel.provider_config['provider_url']}/connections/#{whatsapp_channel.phone_number}"
|
||||
|
||||
expect do
|
||||
service.send_message(test_send_phone_number, message)
|
||||
end.to(raise_error do |error|
|
||||
expect(error.class.name).to eq('Whatsapp::Providers::WhatsappBaileysService::MessageAlreadyProcessingError')
|
||||
end)
|
||||
|
||||
expect(WebMock).not_to have_requested(:post, setup_url)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#media_url' do
|
||||
@ -1104,16 +1121,16 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, send_message_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
text: 'World!',
|
||||
quotedMessage: {
|
||||
key: { id: 'incoming_group_msg', remoteJid: group_jid, fromMe: false, participant: participant_lid },
|
||||
message: { conversation: 'Hello' }
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
text: 'World!',
|
||||
quotedMessage: {
|
||||
key: { id: 'incoming_group_msg', remoteJid: group_jid, fromMe: false, participant: participant_lid },
|
||||
message: { conversation: 'Hello' }
|
||||
}
|
||||
}
|
||||
}, reply_message)
|
||||
)
|
||||
.to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json)
|
||||
|
||||
@ -1131,16 +1148,16 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, send_message_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
text: 'World!',
|
||||
quotedMessage: {
|
||||
key: { id: 'outgoing_group_msg', remoteJid: group_jid, fromMe: true },
|
||||
message: { conversation: 'Hello' }
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
text: 'World!',
|
||||
quotedMessage: {
|
||||
key: { id: 'outgoing_group_msg', remoteJid: group_jid, fromMe: true },
|
||||
message: { conversation: 'Hello' }
|
||||
}
|
||||
}
|
||||
}, reply_message)
|
||||
)
|
||||
.to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json)
|
||||
|
||||
@ -1157,15 +1174,15 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, send_message_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
react: {
|
||||
key: { id: original_message.source_id, remoteJid: group_jid, fromMe: false, participant: participant_lid },
|
||||
text: '👍'
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
react: {
|
||||
key: { id: original_message.source_id, remoteJid: group_jid, fromMe: false, participant: participant_lid },
|
||||
text: '👍'
|
||||
}
|
||||
}
|
||||
}, reaction)
|
||||
)
|
||||
.to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json)
|
||||
|
||||
@ -1182,15 +1199,15 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
stub_request(:post, send_message_path)
|
||||
.with(
|
||||
headers: stub_headers(whatsapp_channel),
|
||||
body: {
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
react: {
|
||||
key: { id: original_message.source_id, remoteJid: group_jid, fromMe: true },
|
||||
text: '❤️'
|
||||
}
|
||||
}
|
||||
}.to_json
|
||||
body: send_message_body({
|
||||
jid: group_jid,
|
||||
messageContent: {
|
||||
react: {
|
||||
key: { id: original_message.source_id, remoteJid: group_jid, fromMe: true },
|
||||
text: '❤️'
|
||||
}
|
||||
}
|
||||
}, reaction)
|
||||
)
|
||||
.to_return(status: 200, headers: { 'Content-Type' => 'application/json' }, body: result_body.to_json)
|
||||
|
||||
@ -1801,4 +1818,8 @@ describe Whatsapp::Providers::WhatsappBaileysService do
|
||||
'x-api-key' => channel.provider_config['api_key']
|
||||
}
|
||||
end
|
||||
|
||||
def send_message_body(hash, msg = message)
|
||||
hash.merge(chatwootMessageId: msg.id).to_json
|
||||
end
|
||||
end
|
||||
|
||||
@ -432,6 +432,40 @@ describe Whatsapp::SendOnWhatsappService do
|
||||
|
||||
expect(message.reload.source_id).to eq('msg_group')
|
||||
end
|
||||
|
||||
describe 'duplicate send on Net::ReadTimeout retry' do
|
||||
let(:send_message_url) do
|
||||
"#{whatsapp_channel.provider_config['provider_url']}/connections/#{whatsapp_channel.phone_number}/send-message"
|
||||
end
|
||||
let(:setup_url) do
|
||||
"#{whatsapp_channel.provider_config['provider_url']}/connections/#{whatsapp_channel.phone_number}"
|
||||
end
|
||||
let(:success_body) { { data: { key: { id: 'wa_msg_123' }, messageTimestamp: '123' } }.to_json }
|
||||
|
||||
before do
|
||||
conversation.contact.update!(phone_number: '+123456789')
|
||||
create(:message, message_type: :incoming, content: 'hi', conversation: conversation)
|
||||
stub_request(:post, setup_url).to_return(status: 200, body: '', headers: {})
|
||||
end
|
||||
|
||||
it 'sends the message twice when first attempt times out and job is retried' do
|
||||
message = create(:message, message_type: :outgoing, content: 'test', conversation: conversation, source_id: nil)
|
||||
|
||||
stub = stub_request(:post, send_message_url)
|
||||
.with { |req| JSON.parse(req.body)['chatwootMessageId'] == message.id }
|
||||
.to_raise(Net::ReadTimeout.new('Net::ReadTimeout'))
|
||||
.then
|
||||
.to_return(status: 200, body: success_body, headers: { 'Content-Type' => 'application/json' })
|
||||
|
||||
expect { SendReplyJob.perform_now(message.id) }.to(raise_error { |e| expect(e.class.name).to eq('Net::ReadTimeout') })
|
||||
expect(message.reload.source_id).to be_nil
|
||||
|
||||
SendReplyJob.perform_now(message.id)
|
||||
expect(message.reload.source_id).to eq('wa_msg_123')
|
||||
|
||||
expect(stub).to have_been_requested.twice
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when provider is zapi' do
|
||||
|
||||
Loading…
Reference in New Issue
Block a user