שאיבת אימיילים אוטומטית

Email Fetching System - מערכת שאיבה אוטומטית עם Celery Beat

סקירה כללית

מערכת שאיבת האימיילים מאפשרת קבלה אוטומטית של חשבוניות ומסמכים מתיבות דואר אלקטרוני. המערכת פועלת ברקע באמצעות Celery + Celery Beat ומעבדת קבצים מצורפים באופן אוטומטי.

כל 5 דקות

שאיבה אוטומטית

IMAP + Gmail

פרוטוקולים נתמכים

PDF, JPG, PNG

סוגי קבצים

OCR אוטומטי

עיבוד מיידי

סטטוס קונטיינרים:
techlabs-prod-celery-worker
Up & Running

מעבד משימות ברקע

techlabs-prod-celery-beat
Up & Running

מתזמן משימות

ארכיטקטורה

Celery Beat
Scheduler
Redis
Message Broker
Celery Worker
Task Executor
Email Servers
IMAP / Gmail API
OCR Processing
Tesseract / AI
PostgreSQL
Database

הגדרת Celery

קובץ הגדרות: celery_app.py
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

# Create Celery instance
celery = Celery(
    'techlabs',
    broker='redis://techlabs-redis:6379/0',
    backend='redis://techlabs-redis:6379/0'
)

# Configuration
celery.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Jerusalem',
    enable_utc=True,

    # Task settings
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 minutes
    task_soft_time_limit=25 * 60,  # 25 minutes
    task_acks_late=True,
    worker_prefetch_multiplier=1,

    # Beat schedule - see below
    beat_schedule={...}
)
הפעלת Celery:
# Start Celery Worker
celery -A app.celery_app worker --loglevel=info --concurrency=4

# Start Celery Beat (scheduler)
celery -A app.celery_app beat --loglevel=info

# Combined (development only)
celery -A app.celery_app worker --beat --loglevel=info

משימות מתוזמנות

המערכת מריצה משימות ברקע באופן אוטומטי לפי לוח זמנים:

משימה תדירות תיאור קובץ
fetch_all_accounts_task כל 5 דקות שאיבת אימיילים מכל החשבונות הפעילים tasks_email_integration.py
poll_all_mailboxes כל 5 דקות סריקת תיבות דואר (IMAP) tasks/email_tasks.py
process_pending_ocr_jobs כל 2 דקות עיבוד משימות OCR ממתינות tasks/ocr_tasks.py
auto_categorize_orphans כל 10 דקות קטגוריזציה אוטומטית של מסמכים tasks/categorization_tasks.py
cleanup_old_attachments יומי 01:00 מחיקת קבצים ישנים (30+ ימים) tasks_email_integration.py
send_fetch_summary_email יומי 08:00 שליחת דוח סיכום יומי tasks_email_integration.py
cleanup_old_notifications שבועי (ראשון) ניקוי התראות ישנות tasks/email_tasks.py
הגדרת Beat Schedule:
beat_schedule = {
    # Email fetching - every 5 minutes
    'email-integration-fetch-all-accounts': {
        'task': 'app.tasks_email_integration.fetch_all_accounts_task',
        'schedule': timedelta(minutes=5),
        'options': {
            'expires': 240  # 4 minutes
        }
    },

    # OCR processing - every 2 minutes
    'process-pending-ocr-jobs': {
        'task': 'app.tasks.ocr_tasks.process_pending_ocr_jobs',
        'schedule': crontab(minute='*/2')
    },

    # Cleanup - daily at 1:00 AM
    'email-integration-cleanup-attachments': {
        'task': 'app.tasks_email_integration.cleanup_old_attachments_task',
        'schedule': crontab(hour=1, minute=0),
        'kwargs': { 'days_old': 30 }
    }
}

ניהול חשבונות אימייל

חשבונות אימייל נשמרים בטבלת email_accounts ומנוהלים דרך ה-API או ממשק המשתמש.

מבנה הטבלה:
class EmailAccount(db.Model):
    __tablename__ = 'email_accounts'

    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(255), nullable=False)
    provider = db.Column(db.String(50))  # 'gmail', 'imap', 'outlook'

    # IMAP Configuration
    imap_server = db.Column(db.String(255))
    imap_port = db.Column(db.Integer, default=993)
    imap_username = db.Column(db.String(255))
    imap_password_encrypted = db.Column(db.Text)  # AES-256 encrypted

    # Gmail OAuth
    gmail_token_encrypted = db.Column(db.Text)
    gmail_refresh_token_encrypted = db.Column(db.Text)

    # Auto-fetch settings
    auto_fetch = db.Column(db.Boolean, default=True)
    fetch_interval = db.Column(db.Integer, default=300)  # seconds
    last_fetch = db.Column(db.DateTime)
    is_active = db.Column(db.Boolean, default=True)

    # Relationships
    company_id = db.Column(db.Integer, db.ForeignKey('scanning_companies.id'))
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
אבטחה: כל הסיסמאות וה-tokens מוצפנים באמצעות AES-256 Fernet. מפתח ההצפנה נשמר ב-environment variable: ENCRYPTION_KEY

הגדרת IMAP

להתחברות לשרתי IMAP סטנדרטיים:

Gmail
Server:imap.gmail.com
Port:993
SSL:Yes
* יש להפעיל App Password או OAuth
Outlook/Office 365
Server:outlook.office365.com
Port:993
SSL:Yes
דוגמת קוד - חיבור IMAP:
import imaplib
import email

def fetch_imap_messages(account):
    # Decrypt password
    password = decrypt(account.imap_password_encrypted)

    # Connect
    imap = imaplib.IMAP4_SSL(
        account.imap_server,
        account.imap_port
    )
    imap.login(account.imap_username, password)

    # Select INBOX
    imap.select('INBOX')

    # Search for unread messages
    _, message_numbers = imap.search(None, 'UNSEEN')

    for num in message_numbers[0].split():
        _, msg_data = imap.fetch(num, '(RFC822)')
        message = email.message_from_bytes(msg_data[0][1])

        # Process attachments
        for part in message.walk():
            if part.get_content_disposition() == 'attachment':
                save_attachment(part)

    imap.close()
    imap.logout()

Gmail OAuth 2.0

התחברות ל-Gmail באמצעות OAuth 2.0 מאפשרת גישה בטוחה יותר ללא צורך בסיסמה.

שלבי ההגדרה:
1 יצירת פרויקט ב-Google Cloud Console

צור פרויקט חדש ב-console.cloud.google.com

2 הפעלת Gmail API

APIs & Services → Enable Gmail API

3 יצירת OAuth 2.0 Credentials

Credentials → Create Credentials → OAuth 2.0 Client ID

4 הגדרת Redirect URI
https://labs.levor.io/api/email-integration/gmail/callback
Environment Variables:
GMAIL_CLIENT_ID=your_client_id.apps.googleusercontent.com
GMAIL_CLIENT_SECRET=your_client_secret
GMAIL_REDIRECT_URI=https://labs.levor.io/api/email-integration/gmail/callback
קוד - שאיבת הודעות Gmail:
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

def fetch_gmail_messages(account):
    # Create credentials from stored tokens
    creds = Credentials(
        token=decrypt(account.gmail_token_encrypted),
        refresh_token=decrypt(account.gmail_refresh_token_encrypted),
        token_uri='https://oauth2.googleapis.com/token',
        client_id=app.config['GMAIL_CLIENT_ID'],
        client_secret=app.config['GMAIL_CLIENT_SECRET']
    )

    # Build Gmail service
    service = build('gmail', 'v1', credentials=creds)

    # Fetch messages with attachments
    results = service.users().messages().list(
        userId='me',
        labelIds=['INBOX'],
        q='has:attachment',
        maxResults=50
    ).execute()

    for msg_meta in results.get('messages', []):
        msg = service.users().messages().get(
            userId='me',
            id=msg_meta['id'],
            format='full'
        ).execute()

        process_gmail_message(account.id, msg, service)

תהליך העיבוד המלא

Celery Beat
כל 5 דקות
fetch_all_accounts
שאיבת מיילים
חילוץ קבצים
OCR Processing
AI Line Items
שמירה ב-DB
התראה למשתמש
תהליך מפורט:
  1. Celery Beat מפעיל את fetch_all_accounts_task כל 5 דקות
  2. EmailFetcherManager מתחבר לכל חשבונות האימייל הפעילים
  3. עבור כל חשבון - שאיבת הודעות חדשות (עד 100 לחשבון)
  4. לכל הודעה עם קובץ מצורף - יצירת process_email_task
  5. AttachmentHandler שומר את הקובץ ויוצר רשומת ScannedDocument
  6. OCR Task מתווסף לתור לעיבוד
  7. LineItemExtractor מחלץ פריטים מהטקסט (Hybrid: AI + Regex)
  8. שמירת כל הנתונים ב-PostgreSQL
  9. יצירת התראה למשתמש על מסמך חדש

הגדרת Docker

docker-compose.yml (חלק רלוונטי):
services:
  # Redis - Message Broker
  redis:
    image: redis:7-alpine
    container_name: techlabs-redis
    restart: unless-stopped
    networks:
      - techlabs_network

  # Celery Worker
  celery-worker:
    build: ./sites/techlabs
    container_name: techlabs-prod-celery-worker
    command: celery -A app.celery_app worker --loglevel=info --concurrency=4
    restart: unless-stopped
    depends_on:
      - redis
      - db
    environment:
      - DATABASE_URL=postgresql://...
      - CELERY_BROKER_URL=redis://techlabs-redis:6379/0
      - CELERY_RESULT_BACKEND=redis://techlabs-redis:6379/0
      - GOOGLE_AI_API_KEY=${GOOGLE_AI_API_KEY}
    networks:
      - techlabs_network
    volumes:
      - ./sites/techlabs:/app

  # Celery Beat (Scheduler)
  celery-beat:
    build: ./sites/techlabs
    container_name: techlabs-prod-celery-beat
    command: celery -A app.celery_app beat --loglevel=info
    restart: unless-stopped
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://techlabs-redis:6379/0
    networks:
      - techlabs_network
    volumes:
      - ./sites/techlabs:/app
פקודות ניהול:
# View container status
docker ps | grep celery

# View worker logs
docker logs -f techlabs-prod-celery-worker

# View beat logs
docker logs -f techlabs-prod-celery-beat

# Restart workers
docker restart techlabs-prod-celery-worker techlabs-prod-celery-beat

# Execute task manually
docker exec techlabs-prod-web python -c "
from app.tasks_email_integration import fetch_all_accounts_task
result = fetch_all_accounts_task.apply()
print(result.get())
"

ניטור ולוגים

צפייה בלוגים:
# Real-time worker logs
docker logs -f techlabs-prod-celery-worker --tail=100

# Search for specific task
docker logs techlabs-prod-celery-worker 2>&1 | grep "fetch_all_accounts"

# View errors only
docker logs techlabs-prod-celery-worker 2>&1 | grep -i error
Flower - Web Monitoring (אופציונלי):
# Install Flower
pip install flower

# Run Flower
celery -A app.celery_app flower --port=5555

# Access at: http://localhost:5555
טיפ: ניתן להוסיף Flower כקונטיינר Docker עם Traefik routing לגישה מ-flower.labs.levor.io

פתרון בעיות

בדיקות:

  1. וודא ש-Celery Beat רץ: docker ps | grep beat
  2. בדוק חיבור ל-Redis: docker exec techlabs-redis redis-cli ping
  3. בדוק לוגים: docker logs techlabs-prod-celery-beat

פתרון:

docker restart techlabs-prod-celery-beat

סיבות אפשריות:

  • אין חשבונות אימייל מוגדרים
  • חשבון לא פעיל (is_active=False)
  • סיסמה/token לא תקינים
  • שגיאת חיבור לשרת IMAP

בדיקה:

docker exec techlabs-prod-db psql -U techlab_user -d techlab_db -c "
SELECT id, email, provider, is_active, last_fetch
FROM email_accounts
WHERE is_active = true;"

פתרונות:

  1. הגדל concurrency: --concurrency=8
  2. בדוק משימות תקועות:
    docker exec techlabs-prod-celery-worker celery -A app.celery_app inspect active
  3. נקה תור משימות:
    docker exec techlabs-redis redis-cli FLUSHALL
  4. הפעל מחדש: docker restart techlabs-prod-celery-worker

שגיאות נפוצות:

  • invalid_grant - Token פג תוקף, יש לחבר מחדש
  • access_denied - המשתמש לא אישר את ההרשאות
  • redirect_uri_mismatch - URI לא תואם ל-GCP Console

פתרון:

  1. היכנס ל-ניהול חשבונות
  2. מחק את החשבון הבעייתי
  3. הוסף מחדש והרשה גישה