The Context Pipeline Explained — From Webhook to Draft with Grace
NeatMail isn't just another email client that slaps a "Generated by AI" button on a textarea. The AI draft system is the product's crown jewel — a multi-stage, gracefully-failing pipeline that weaves together email classification, vector search, calendar availability, Slack messages, CRM data, GitHub pull requests, Notion pages, and a custom Azure OpenAI model to produce context-aware replies that sound like you.
And the best part? Every single component can fail, timeout, or return nothing — and the system keeps working. That's not an accident. It's by design.
Let's go deep into the architecture.
The Trigger: An Email Arrives
It all starts with a webhook. When a Gmail or Outlook push notification lands on NeatMail's API, the system needs to make a critical decision: does this email need a reply?
The webhook handler (gmail-webhook.ts) does a first pass:
- Deduplication — Each message is checked against a Redis "processed" set. If we've seen it, we skip it. No duplicate processing.
- Gmail Category Pre-filter — Before even thinking about AI, the system checks Gmail's built-in labels.
CATEGORY_PROMOTIONS? Skip.CATEGORY_SOCIAL? Probably skip. This saves us from wasting AI compute on marketing emails and LinkedIn notifications. - Classification via Batched API — If the email passes the pre-filter, it enters the AI classification pipeline.
The Classification Layer: Batching for Throughput
We don't classify emails one-by-one. That would be wasteful and slow. Instead, we use a batched approach (lib/classify-batch.ts) that's worth studying.
When a new email needs classification, bufferClassifyJob() pushes the request onto a Redis list. When that list hits batch size (5), it enqueues a BullMQ job to flush the batch.
export async function bufferClassifyJob(requestId: string, request: ModelRequest) {
const item = { id: requestId, r: request };
const count = await redis.rpush(QUEUE_KEY, JSON.stringify(item));
if (count >= BATCH_SIZE) {
await classifyQueue.add("flush-classify", {});
}
}
The flush worker atomically pops items (using a distributed Redis lock via NX), sends them as a batch POST to CLASSIFICATION_API_URL/classify-batch, and stores each result in Redis with a 120-second TTL. The original caller polls Redis every 100ms for up to 60 seconds waiting for its result.
This batching pattern means:
- Lower API costs — Fewer HTTP round-trips to the external model API.
- Graceful overloading — If the batch API fails, every item in the batch gets an error result stored in Redis, and each caller gets a clear error instead of a hanging promise.
- Distributed-safe — The Redis lock ensures only one server instance flushes at a time, even across multiple deployments.
The classification returns a category, response_required boolean, and optional ai_summary/ai_action. We only proceed to draft generation when the label is "Pending Response" or "Action Needed" and responseRequired === true. Everything else — newsletters, receipts, automated alerts — gets a label but never triggers a draft.
Queuing the Draft: BullMQ with Exponential Backoff
Once classification decides a draft is needed, a job is enqueued on the draft queue:
export const draftQueue = new Queue("draft", {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true,
removeOnFail: 100,
},
});
Three retry attempts with exponential backoff (1s → 2s → 4s). The removeOnFail: 100 keeps the last 100 failed jobs for debugging without bloating Redis.
The worker (bullmq/workers/process-draft.ts) runs with concurrency 5 and a 5-minute lock duration — because the full pipeline (fetch body → call draft API → assemble context → call OpenAI → create draft) can take a while.
The Worker's First Gate: Tier Limits
Before doing any real work, the worker checks if the user is allowed to generate a draft:
- Is auto-drafting enabled? Checked from
draft_preferencetable. If disabled, skip immediately. - MAX tier? Unlimited drafts. Go ahead.
- PRO tier? Check the monthly limit (20 drafts/month). If exceeded, return a clear
"skipped"status.
This tier gating happens before any API calls — no wasted compute on users who've hit their limit.
Full Body Fetch: Graceful Degradation
The webhook only provides a snippet. For good drafts, we need the full email body. The worker fetches it from the Gmail API (format "full") or Outlook Graph API, recursively extracting body parts and stripping quoted replies.
try {
fullEmailBody = await getGmailMessageBody(userId, messageId);
} catch (error) {
console.error("Failed to fetch full Gmail body, using snippet fallback", { userId, messageId, error });
fullEmailBody = emailData.bodySnippet;
}
If the full body fetch fails (network error, token expired, API rate limit), the worker gracefully falls back to the snippet. A draft with a snippet is better than no draft at all, and the user never knows something went wrong.
The Draft Context API: Vector Search + Intent Analysis
Before the context pipeline runs, the worker calls an external draft context API (getDraftContext()) that does heavy lifting:
- Vector similarity search — Past emails from this sender are retrieved via embeddings. Up to 8 history items, each capped at 600 characters.
- Thread context — Recent messages in the current thread are fetched. Up to 8 items, each capped at 800 characters.
- Intent classification — The email's intent is determined (scheduling request, task assignment, question, complaint, follow-up, introduction, etc.).
- Keyword extraction — Important entities and dates are pulled from the email body.
- Date parsing — Mentioned dates are extracted as both raw text and ISO 8601 strings (supporting both instants and intervals like "2026-05-31T15:00:00+05:30/2026-05-31T16:00:00+05:30").
This external API has a 240-second timeout (4 minutes) — intentional, because vector databases can be slow on cold queries.
The Context Pipeline: Where the Magic Happens
This is the heart of the system (context-engine/pipeline.ts). It's an extensible provider architecture designed for graceful degradation.
The Provider Model
Every integration implements the ContextProvider interface:
interface ContextProvider {
id: string;
name: string;
relevantIntents: EmailIntent[];
fetchContext(email: IncomingEmail, entities: EmailEntities, userId: string): Promise<ContextCard | null>;
}
The relevantIntents array is key — providers are filtered by intent before any work is done. If the email is a complaint, the calendar provider won't even fire. If it's a scheduling request, the GitHub provider won't fire. This saves API calls and keeps context relevant.
Currently, NeatMail has six providers:
| Provider | Intents | What it does |
|---|---|---|
| Google Calendar | scheduling_request, follow_up | Checks freeBusy + events.list for mentioned dates |
| Outlook Calendar | scheduling_request, follow_up | Same via Microsoft Graph API |
| Slack | question, task_assignment, follow_up, status_update, approval, complaint, general | Searches Slack messages matching sender + keywords |
| HubSpot | question, task_assignment, follow_up, status_update, approval, complaint, introduction, general | Fetches contact info, open deals, tasks, tickets, recent activity |
| Notion | question, task_assignment, follow_up, status_update, approval, general | Searches pages + fetches content snippets and unresolved comments |
| GitHub | question, task_assignment, follow_up, status_update, approval, complaint, introduction, general | Resolves repo, fetches open PRs with CI status, issues, milestones, main branch health |
The Assembler: Promise.allSettled with Timeouts
The ContextAssembler (context-engine/assembler.ts) orchestrates everything:
async assemble(email: IncomingEmail, entities: EmailEntities): Promise<ContextCard[]> {
const relevant = this.providers.filter(p =>
p.relevantIntents.includes(entities.intent)
);
const results = await Promise.allSettled(
relevant.map(p =>
Promise.race([
p.fetchContext(email, entities, email.userId),
new Promise<null>(resolve => setTimeout(() => resolve(null), TIMEOUT_MS))
])
)
);
const cards = results
.filter((r): r is PromiseFulfilledResult<ContextCard> =>
r.status === "fulfilled" && r.value !== null
)
.map(r => r.value)
.sort((a, b) => score(b.relevance) - score(a.relevance));
return cards;
}
Critical design decisions:
- Promise.allSettled — Not
all. If HubSpot's API is down, Slack and GitHub still contribute context. One provider failure never blocks others. - 10-second timeout per provider — Via
Promise.race. A slow provider gets cut off and returns null. The system never waits more than 10 seconds for any single provider. - Graceful nulls — Every provider is designed to return null when it has nothing useful. No data? No card. The prompt builder handles zero cards just fine.
- Relevance sorting — High-relevance cards (calendar availability, matched contacts) appear first in the prompt, giving them more weight in the AI's attention.
Calendar Providers: Cross-Validation Against Busy-Free
The calendar providers deserve special attention. Both Google and Outlook implementations do something interesting: they cross-validate the free/busy API against the events list.
When checking if a time slot is free:
- Call freeBusy/getSchedule — The lightweight API that returns busy intervals.
- Call events.list/calendarView — The full API that returns event objects.
- Cross-validate — If freeBusy says busy but events.list shows no overlapping events, trust freeBusy (it's usually sourced from the server-side calendar data). If freeBusy says free but events.list shows overlapping events, override to BUSY (the events list is more detailed and catches things like recurring events that freeBusy might miss).
This matters when the AI is proposing meeting times. A false "free" means the user gets an awkward reply. A false "busy" means a slightly inconvenient but correct reply. The system errs on the side of "busy."
Additionally, the providers handle timezone offset mismatches gracefully. If a date is specified as "2026-05-31T15:00:00+05:30" but the user's timezone is set to "America/New_York" (UTC-4), the system detects the mismatch and reinterprets the time in the user's timezone using date-fns-tz. This prevents embarrassing drafts that propose meetings at 3 AM.
Prompt Construction: Crafting the AI's Context Window
With context cards assembled and history retrieved, the pipeline builds a system message that is ~200 lines of carefully crafted instructions. Here's what goes into the prompt:
The System Message
The system message sets ground rules:
- Detection rules — When to set
noReplyNeeded. Newsletters, receipts, system summaries:NO_REPLY_NEEDED. Emails containing "action required", "failed", "expires", "critical", "warning": always reply. - Reply generation rules — Five categories with specific guidance: simple acknowledgments (1-2 sentences), action requests (confirm + concrete next steps), information requests (acknowledge + say what you'll do), complaints (empathy + resolution), and a hard limit of 8 sentences.
- No placeholders — Explicit instruction to never use bracket placeholders like
[DATE NEEDED]. The AI must use available information or say it doesn't know. - No greeting or signature — The draft body is just the reply text. Greetings and sign-offs are added by the SMTP envelope builders.
Intent Guidance
Each of the 10 intents has specific, hand-crafted guidance:
Scheduling request: "Propose specific times, reference calendar availability from the connected app context, or ask clarifying questions about timing."
Task assignment: "Take ownership immediately. Confirm the task, restate the deadline if one is given, and outline the concrete next steps you will take. Do NOT ask for prerequisites or data you would naturally gather during the work."
Complaint: "Acknowledge the issue with empathy, take responsibility where appropriate, and provide a specific resolution or next-step timeline. Do not be dismissive."
Style Mirroring
The buildStyleInstruction() function dynamically generates instructions based on whether the user has email history:
- No history → Default to a "professional, neutral tone. Keep sentences concise. Avoid excessive formality, filler phrases, and unnecessary exclamation marks."
- With history → Full style-mirroring mode: "You MUST analyze their writing style and replicate it exactly." The instruction prompts the model to analyze sentence length, vocabulary level, punctuation habits (em-dashes, semicolons, ellipses), emoji usage, capitalization quirks, greeting style, sign-off style, and request approach.
This is what makes drafts feel like you wrote them. A user who writes "hey — can u check this pls?" gets drafts that match that casual tone. A user who writes formal emails gets polished responses.
The User Message
The user message assembles everything into a structured prompt:
<context>
Connected app context:
### Google Calendar
User's availability:
- tomorrow 3pm → BUSY. 2 events.
### Slack
Relevant Slack messages (3 total matches):
...
### GitHub
GitHub context:
...
Previous emails from this sender (history):
[formatted history items]
Current email thread (recent messages):
[formatted thread items]
Mentioned dates: tomorrow (2026-06-09T15:00:00+05:30)
Timezone: Asia/Kolkata
Keywords: budget, approval, Q2
</context>
<input_email>
From: Alice
Subject: Budget approval needed
Body: Can you approve the Q2 budget before tomorrow's meeting?
</input_email>
The AI Call: Structured Output via JSON Schema
The assembled prompt is sent to Azure OpenAI's gpt-5-mini deployment with a strict structured output:
const completion = await openai.chat.completions.create({
model: "gpt-5-mini",
max_completion_tokens: 2048,
reasoning_effort: "low",
response_format: {
type: "json_schema",
json_schema: {
name: "DraftOutput",
strict: true,
schema: {
type: "object",
properties: {
noReplyNeeded: { type: "boolean" },
draft: { type: "string" },
},
required: ["noReplyNeeded", "draft"],
additionalProperties: false,
},
},
},
messages: [
{ role: "system", content: systemMessage },
{ role: "user", content: userMessage },
],
});
Key decisions here:
json_schemamode — Notjson_object. This enforces the exact schema withadditionalProperties: false, ensuring the model can't add extra fields. The output is guaranteed parseable.reasoning_effort: "low"— GPT-5 series supports configurable reasoning depth. "Low" is appropriate for draft generation where speed matters more than deep chain-of-thought.- 2048 max tokens — Generous ceiling, but the economy of the 8-sentence limit means most responses are well under 500 tokens.
Graceful Response Parsing
The pipeline wraps the model response in layered error handling:
if (!rawContent) {
console.error("Empty model response", `finish_reason=${finishReason}`);
return { draft: "", contextSummary: contextBlock };
}
try {
const parsed = JSON.parse(rawContent);
// validate shape
} catch (err) {
console.error("JSON parse failed", rawContent, finishReason, err);
}
Empty response? Returns empty draft. JSON parse fails? Returns empty draft (with detailed logging for debugging). Wrong schema? Returns empty draft. The caller (the BullMQ worker) handles an empty draft by simply not creating one.
The system prompt includes five worked examples (Example A through E) showing the model correct vs. incorrect responses for different scenarios. Example A is particularly important — it teaches the model that the most common email type (someone sharing a document) deserves a 1-sentence acknowledgment, not a 5-paragraph plan.
Draft Creation: MIME Construction
If the pipeline returns a valid draft (not "NO_REPLY_NEEDED" and not empty), the worker creates an actual email draft in the user's mailbox.
Gmail Draft Creation
createGmailDraft() builds a proper MIME message:
MIME-Version: 1.0
From: [email protected]
To: [email protected]
Subject: =?UTF-8?B?UmU6IE91dGxv...?= (base64 encoded "Re: Original Subject")
In-Reply-To: <[email protected]>
References: <[email protected]> <[email protected]>
Content-Type: text/html; charset="UTF-8"
<div style="color: #333333; font-size: 14px;">
The AI-generated draft body here...
</div>
--
User's signature
It extracts the RFC Message-ID and References headers from the original email to ensure proper threading. The subject is base64-encoded for international character support. The user's font color, font size, and signature are applied.
The final call is users.drafts.create with threadId, ensuring the draft appears in the correct thread in Gmail.
Outlook Draft Creation
createOutlookDraft() calls Microsoft Graph's me/messages/{messageId}/createReply endpoint with a subject and HTML body. Same styling logic, same threading behavior.
Telegram Confirmation (Optional)
If the user has Telegram integration enabled with forward_draft_for_confirmation, the worker sends the draft text to their Telegram chat for manual approval via sendDraftNotification(). A telegramPendingDraft record is created to track the interactive confirmation flow. The user can approve, edit, or reject the draft from Telegram — no need to open the email client.
Draft Count Tracking: Monthly Limits
After a successful draft is created, the worker increments the user's monthly draft count:
if (drafted) {
await incrementDraftCount(userId);
}
This upserts a record in the draft_preference table with the current count and the reset date. The PRO tier is limited to 20 drafts per month. The limit works across all drafts — whether triggered by Gmail webhooks, Outlook webhooks, or manually. The check happens before any expensive work, so users who've hit their limit aren't charged for API calls.
The Complete Flow: End-to-End
Here's the full sequence when an email arrives:
Email arrives (Gmail PubSub / Outlook webhook)
│
├─ Deduplication (Redis check)
│
├─ Gmail category pre-filter
│
├─ Buffer classification job (Redis list)
│ └─ Flush batch (5 items) → POST /classify-batch
│ └─ Store result in Redis (TTL 120s)
│
├─ Poll for classification result (up to 60s, every 100ms)
│
├─ Decision: shouldDraft? (Label = "Pending Response" || "Action Needed")
│ └─ No → label + done
│
├─ Yes → Enqueue draft job (BullMQ, 3 retries, exp backoff)
│
├─ Worker starts:
│ ├─ Tier check (MAX? PRO limit?)
│ ├─ Preference check (enabled?)
│ ├─ Full body fetch (Gmail/Outlook API, fallback to snippet)
│ ├─ Call draft context API (vector search + intent + dates)
│ ├─ Context pipeline:
│ │ ├─ Register providers (calendar, Slack, HubSpot, Notion, GitHub)
│ │ ├─ Filter by intent relevance
│ │ ├─ Provider assembly (Promise.allSettled, 10s timeout each)
│ │ └─ Prompt construction (history + thread + context + instructions)
│ ├─ Azure OpenAI call (gpt-5-mini, JSON schema, reasoning_effort: low)
│ ├─ Parse response (graceful: empty/error → no draft)
│ ├─ Create draft (Gmail/Outlook API)
│ ├─ Telegram notification (optional)
│ └─ Increment draft count
│
└─ Draft lands in user's mailbox. User edits and sends.
Throttle Layer: Token-Bucket via Redis Lua
Behind the scenes, all external API calls pass through a Redis-backed token-bucket throttle (lib/throttle.ts). This is a single atomic Lua script that prevents thundering-herd problems across multiple server instances:
-- Token-bucket algorithm
local key = KEYS[1]
local maxTokens = tonumber(ARGV[1])
local intervalMs = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'lastRefill')
-- ... refill tokens, deduct cost, return wait time
Default limits: OpenAI 500 RPM, Google 15,000 RPM (matching the grandfathered quota). Calls that exceed the rate limit are delayed, not dropped — the throttle function sleeps for the required duration and retries, up to a 30-second max wait.
If the wait exceeds 30 seconds, a ThrottleTimeoutError is thrown, which propagates up to BullMQ's retry mechanism. The exponential backoff on the queue handles the rest.
What Makes This Pipeline "Graceful"
Six design principles make this pipeline resilient:
-
Empty is OK — Every provider returns null if it has nothing useful. The prompt builder handles zero cards. The model sometimes returns
noReplyNeeded. The system handles all of these without error. -
Timeouts kill the slow, not the fast — 10 seconds per provider. If HubSpot's API is slow, Slack and GitHub still contribute. If everything is slow, the pipeline completes in ~10 seconds with an empty context block.
-
Fallbacks everywhere — Full body fetch fails? Use the snippet. The model fails to parse? Return empty draft. Empty draft? The worker doesn't create one. The user just doesn't get a draft — no error, no crash, no confusion.
-
Retries with backoff — 3 attempts, 1s → 2s → 4s. Transient network errors are automatically handled.
-
Distributed safety — Redis locks prevent duplicate batch flushes. Redis-processed sets prevent duplicate email processing. The BullMQ lock prevents duplicate draft jobs.
-
Draft API integration follows the Open-Closed Principle — Adding a new provider to the context engine requires: (a) creating a file in
context-engine/providers/, (b) implementing theContextProviderinterface, and (c) registering it inpipeline.ts. That's it. No other file changes.
What's Next
The pipeline was designed for extensibility from day one. The types file even has a comment: // THIS IS THE ONLY CONTRACT EVERY INTEGRATION IMPLEMENTS. Adding Jira, Linear, Figma, or any other integration follows the same pattern as Slack and HubSpot.
The EmailIntent type already has room to grow: scheduling_request, meeting_confirmation, task_assignment, status_update, question, approval, follow_up, introduction, complaint, general. Each new provider declares which intents it's relevant for, and the assembler handles the rest.
As users accumulate more email history, the style mirroring gets better. As more providers are added, the context gets richer. The pipeline is built to scale with both.
Architecture at a Glance
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Gmail/Outlook│────▶│ Classification │────▶│ Draft Queue │
│ Webhook │ │ (batch + poll) │ │ (BullMQ) │
└──────────────┘ └─────────────────┘ └──────┬───────┘
│
┌────────▼────────┐
│ Draft Worker │
│ (concurrency 5)│
└────────┬────────┘
│
┌────────────────────────────────┼────────────────────────────────┐
│ │ │
┌──────▼──────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ External │ │ Context │ │ Gmail/Outlook │
│ Draft API │ │ Pipeline │ │ API (create) │
│ (vector + │ │ (6 providers) │ └─────────────────┘
│ intent) │ └────────┬────────┘
└─────────────┘ │
┌───────┴────────┐
│ Azure OpenAI │
│ gpt-5-mini │
│ JSON Schema │
└────────────────┘
The entire system is designed around one core insight: LLMs are powerful but unreliable, external APIs are fast but flaky, and users deserve a draft every time — even if it's imperfect. By wrapping every fallible component in layers of graceful degradation, NeatMail delivers AI drafts that just work.
