From 9a157a260a45b7ff0b7b8f834740b47bfcd69994 Mon Sep 17 00:00:00 2001 From: Gabriel Jablonski Date: Wed, 11 Mar 2026 22:43:51 -0300 Subject: [PATCH] feat: add IMAP historical email import rake task (#234) * feat: add IMAP historical email import rake task Adds `imap:import` rake task to bulk-import emails from IMAP into a Chatwoot inbox with parallel workers, progress tracking, and proper backdating of message/conversation timestamps. Co-Authored-By: Claude Opus 4.6 * fix: address PR review feedback for IMAP import task - Use IMAP UIDs instead of sequence numbers for stable cross-session refs - Respect channel.imap_enable_ssl instead of hardcoding ssl: true - Add ensure block to guarantee IMAP logout on worker errors - Add open_timeout to prevent hanging connections Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- lib/tasks/imap_import.rake | 225 +++++++++++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 lib/tasks/imap_import.rake diff --git a/lib/tasks/imap_import.rake b/lib/tasks/imap_import.rake new file mode 100644 index 000000000..addc85453 --- /dev/null +++ b/lib/tasks/imap_import.rake @@ -0,0 +1,225 @@ +# frozen_string_literal: true + +require 'net/imap' +require 'concurrent' + +def connect_imap(channel, folder) + imap = Net::IMAP.new(channel.imap_address, port: channel.imap_port, ssl: channel.imap_enable_ssl, open_timeout: 30) + + if channel.google? + token = Google::RefreshOauthTokenService.new(channel: channel).access_token + imap.authenticate('XOAUTH2', channel.imap_login, token) + elsif channel.microsoft? + token = Microsoft::RefreshOauthTokenService.new(channel: channel).access_token + imap.authenticate('XOAUTH2', channel.imap_login, token) + else + imap.authenticate('PLAIN', channel.imap_login, channel.imap_password) + end + + imap.select(folder) + imap +end + +def format_duration(seconds) + seconds = seconds.to_i + if seconds < 60 + "#{seconds}s" + elsif seconds < 3600 + "#{seconds / 60}m#{seconds % 60}s" + else + "#{seconds / 3600}h#{(seconds % 3600) / 60}m" + end +end + +def validate_inbox(inbox_id) + inbox = Inbox.find_by(id: inbox_id) + abort "ERROR: Inbox #{inbox_id} not found." unless inbox + + channel = inbox.channel + abort "ERROR: Inbox #{inbox_id} is not an email channel." unless channel.is_a?(Channel::Email) + abort "ERROR: IMAP is not enabled for inbox #{inbox_id}." unless channel.imap_enabled? + + [inbox, channel] +end + +def scan_new_email_uids(imap, channel, uids) + new_uids = [] + skipped = 0 + + uids.each_slice(100) do |batch| + headers = imap.uid_fetch(batch, 'BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)]') + next if headers.blank? + + headers.each do |data| + msg_id = Mail.read_from_string(data.attr['BODY[HEADER.FIELDS (MESSAGE-ID)]']).message_id + if msg_id.blank? || channel.inbox.messages.exists?(source_id: msg_id) + skipped += 1 + else + new_uids << data.attr['UID'] + end + end + + print "\r Scanning headers... #{new_uids.size} new, #{skipped} skipped (#{new_uids.size + skipped}/#{uids.size}) " + end + + puts '' + [new_uids, skipped] +end + +def import_single_email(imap, channel, uid) + mail_data = imap.uid_fetch(uid, 'RFC822') + return false if mail_data.blank? + + mail_str = mail_data[0].attr['RFC822'] + return false if mail_str.blank? + + inbound_mail = Mail.read_from_string(mail_str) + Imap::ImapMailbox.new.process(inbound_mail, channel) + + # Backdate message created_at to the original email date + backdate_message(channel, inbound_mail) if inbound_mail.date.present? + true +end + +def backdate_message(channel, inbound_mail) + message = channel.inbox.messages.find_by(source_id: inbound_mail.message_id) + return unless message + + original_date = inbound_mail.date.to_datetime + message.update_columns(created_at: original_date) # rubocop:disable Rails/SkipsModelValidations +end + +# Fixes conversation timestamps after all messages are imported. +# Must run after import to avoid after_create_commit callbacks overwriting values. +def backdate_conversations(inbox) + puts 'Backdating conversation timestamps...' + count = 0 + + inbox.conversations.find_each do |conversation| + oldest = conversation.messages.minimum(:created_at) + newest = conversation.messages.maximum(:created_at) + next unless oldest + + conversation.update_columns( # rubocop:disable Rails/SkipsModelValidations + created_at: oldest, + last_activity_at: newest, + agent_last_seen_at: newest + ) + count += 1 + end + + puts " Updated #{count} conversations." +end + +def import_worker(channel, folder, uid_batch, progress) # rubocop:disable Metrics/MethodLength + ActiveRecord::Base.connection_pool.with_connection do + imap = connect_imap(channel, folder) + begin + uid_batch.each do |uid| + import_single_email(imap, channel, uid) + + progress[:mutex].synchronize do + progress[:imported] += 1 + print_import_progress(progress) + end + rescue StandardError => e + progress[:mutex].synchronize do + progress[:errors] += 1 + print_import_progress(progress) + end + warn "\n [ERROR] uid #{uid}: #{e.message}" + end + ensure + imap&.logout + end + end +rescue StandardError => e + warn "\n [WORKER ERROR] #{e.message}" +end + +def print_import_progress(progress) + processed = progress[:imported] + progress[:skipped] + progress[:errors] + total = progress[:total] + pct = ((processed.to_f / total) * 100).round(1) + elapsed = Time.zone.now - progress[:started_at] + rate = processed.positive? ? (elapsed / processed) : 0 + eta = rate.positive? ? ((total - processed) * rate) : 0 + + print "\r [#{pct}%] #{processed}/#{total} | imported: #{progress[:imported]} | " \ + "skipped: #{progress[:skipped]} | errors: #{progress[:errors]} | " \ + "elapsed: #{format_duration(elapsed)} | ETA: #{format_duration(eta)} " +end + +namespace :imap do # rubocop:disable Metrics/BlockLength + desc 'Import all historical emails from IMAP into a Chatwoot inbox' + task :import, %i[inbox_id days folder workers] => :environment do |_task, args| # rubocop:disable Metrics/BlockLength + inbox_id = args[:inbox_id] + days = (args[:days] || 7).to_i + folder = args[:folder] || 'INBOX' + workers = (args[:workers] || 4).to_i + + if inbox_id.blank? + puts 'Usage: rails imap:import[,,,]' + puts ' days: how far back to look (default: 7)' + puts ' folder: IMAP folder (default: INBOX)' + puts ' workers: parallel connections (default: 4)' + puts '' + puts 'Tip: set RAILS_MAX_THREADS above worker count to avoid DB pool exhaustion:' + puts ' RAILS_MAX_THREADS=10 bundle exec rails imap:import[81,3650,INBOX,8]' + next + end + + inbox, channel = validate_inbox(inbox_id) + + puts "Inbox: #{inbox.name} (ID: #{inbox.id})" + puts "Email: #{channel.email}" + puts "Server: #{channel.imap_address}:#{channel.imap_port}" + puts "Folder: #{folder}" + puts "Lookback: #{days} days" + puts "Workers: #{workers}" + puts '-' * 60 + + # Phase 1: scan headers with a single connection to find new emails + imap = connect_imap(channel, folder) + since_date = (Time.zone.today - days).strftime('%d-%b-%Y') + + puts "Searching emails since #{since_date}..." + uids = imap.uid_search(['SINCE', since_date]) + puts "Found #{uids.length} emails in #{folder}." + + new_uids, skipped = scan_new_email_uids(imap, channel, uids) + imap.logout + + if new_uids.empty? + puts 'Nothing new to import.' + next + end + + puts "Importing #{new_uids.size} emails with #{workers} parallel workers..." + progress = { + imported: 0, skipped: skipped, errors: 0, + total: new_uids.size + skipped, mutex: Mutex.new, started_at: Time.zone.now + } + + # Phase 2: split work across N threads, each with its own IMAP connection + chunks = new_uids.each_slice((new_uids.size.to_f / workers).ceil).to_a + threads = chunks.map do |chunk| + Thread.new { import_worker(channel, folder, chunk, progress) } + end + threads.each(&:join) + + # Phase 3: fix conversation timestamps after all callbacks have settled + backdate_conversations(inbox) + + elapsed = Time.zone.now - progress[:started_at] + + puts '' + puts '=' * 60 + puts 'Import complete!' + puts " Imported: #{progress[:imported]}" + puts " Skipped: #{progress[:skipped]} (already present)" + puts " Errors: #{progress[:errors]}" + puts " Time: #{format_duration(elapsed)}" + puts '=' * 60 + end +end