beio Logobeio

【Cloudflare Workers 全端架構師之路 08】非同步篇:使用 Queues 解耦系統與背景任務

發布於

01. 前言:為什麼 API 回應這麼慢?

在設計 API 時,我們常遇到一種場景: 使用者註冊後,系統需要:

  1. 寫入資料庫 (D1) - 100ms
  2. 呼叫第三方 API 發送歡迎信 (Mailgun/SendGrid) - 500ms ~ 2s
  3. 產生預設的大頭貼並上傳 (R2) - 1s
  4. 將數據送到分析平台 - 500ms

如果全部同步執行,使用者按下「註冊」後要等 4 秒鐘才能看到回應。這在使用者體驗 (UX) 上是不及格的。 更糟糕的是,Workers 有 CPU 時間限制。如果第三方 API 卡住,你的 Worker 可能會超時 (Timeout) 並被強制殺掉,導致流程中斷。

解決方案:Cloudflare Queues (訊息佇列) 將任務「丟」進佇列後立刻回應使用者,由另一個背景 Worker 慢慢處理。這就是 非同步 (Asynchronous)解耦 (Decoupling)

02. 架構比較:Queues vs. AWS SQS

特性Cloudflare QueuesAWS SQS + Lambda
整合度極高 (直接在 wrangler.toml 綁定)需設定 Event Source Mapping, IAM Role
批次處理預設支援 (Batching)支援 (但需手動處理 Partial Failure)
計費模式$0.40 / 百萬次操作$0.40 / 百萬次請求
延遲毫秒級視 Polling 設定而定
開發體驗透過 env.QUEUE.send() 直接呼叫需使用 AWS SDK

架構師筆記: Cloudflare Queues 最強大的地方在於它的 Consumer 設定。你不需要像在 AWS 那樣設定 Polling 間隔或 Visibility Timeout,只要在 Worker 裡寫一個 queue() handler,有訊息進來它就會自動執行。

03. 環境設定:生產者與消費者

Queues 的架構分為兩端:

  1. Producer (生產者): 通常是你的 API Worker,負責 send() 訊息。
  2. Consumer (消費者): 專門處理任務的 Worker,負責 ack() 訊息。

(註:Queues 目前僅限付費方案使用)

步驟 1: 建立 Queue

npx wrangler queues create email-queue

步驟 2: 設定 wrangler.toml

我們可以在同一個專案中同時扮演生產者與消費者,也可以拆開。這裡示範在同一個 wrangler.toml 中設定:

# 定義生產者 (Producer)
[[queues.producers]]
queue = "email-queue"
binding = "EMAIL_QUEUE"

# 定義消費者 (Consumer)
[[queues.consumers]]
queue = "email-queue"
max_batch_size = 10 # 一次處理 10 封信
max_batch_timeout = 5 # 或每 5 秒處理一次
max_retries = 3 # 失敗重試次數
dead_letter_queue = "email-dlq" # (選擇性) 失敗多次後丟到這裡

04. 實作:Email 發送任務

定義訊息型別 (Types)

type EmailMessage = {
  type: 'welcome' | 'reset_password';
  email: string;
  userId: string;
  name: string;
}

生產者 (API Worker)

修改 src/index.ts 的 API 部分:

import { Hono } from 'hono'
const app = new Hono<{ Bindings: { EMAIL_QUEUE: Queue<EmailMessage> } }>()

app.post('/auth/register', async (c) => {
  const { email, name } = await c.req.json();

  // 1. 執行核心邏輯 (寫入 DB)
  // await db.insert(users)...

  // 2. 非同步發送 Email (不會阻塞回應)
  await c.env.EMAIL_QUEUE.send({
    type: 'welcome',
    email,
    name,
    userId: 'user_123'
  });

  // 3. 立刻回應前端
  return c.json({ success: true, message: "Registered! Check your inbox." });
})

export default app

消費者 (Background Worker)

在同一個檔案中,匯出 queue handler:

export default {
  // Hono 的 fetch handler
  fetch: app.fetch,

  // Queue 的 handler
  async queue(batch: MessageBatch<EmailMessage>, env: Env): Promise<void> {
    // batch.messages 包含了一批訊息 (依據 max_batch_size 設定)
    
    // 技巧:使用 Promise.allSettled 並行處理,提高吞吐量
    const tasks = batch.messages.map(async (msg) => {
      try {
        const payload = msg.body;
        console.log(`Processing email for ${payload.email}`);

        // 模擬呼叫第三方 API (如 SendGrid)
        // await sendEmail(payload);
        
        // 成功處理:確認訊息 (從 Queue 中移除)
        msg.ack();
        
      } catch (error) {
        console.error(`Failed to send to ${msg.body.email}`, error);
        
        // 失敗處理:重試 (Retry)
        // 這會讓訊息重新回到 Queue 中,等待下一次被抓取
        // 如果重試超過 max_retries,會被丟到 Dead Letter Queue
        msg.retry();
      }
    });

    // 等待所有任務完成
    await Promise.allSettled(tasks);
  }
}

05. 進階模式:批次寫入優化 (Batch Write)

Queues 不只能用來發 Email,它也是 資料庫寫入緩衝 (Write Buffer) 的絕佳工具。

情境:你有一個 IoT 裝置 API,每秒有 1000 個感測器數據進來。 問題:如果你每秒對 D1 做 1000 次 INSERT,資料庫會鎖死。 解法:先 send() 到 Queue,然後 Consumer 一次 INSERT 100 筆。

// Consumer 範例:批次寫入 D1
async queue(batch: MessageBatch<SensorData>, env: Env) {
  const sensors = batch.messages.map(m => m.body);
  
  if (sensors.length > 0) {
    // 將 100 筆資料合併成單一 SQL
    await env.DB.prepare(`
      INSERT INTO sensor_logs (id, value) VALUES 
      ${sensors.map(() => '(?, ?)').join(',')}
    `)
    .bind(...sensors.flatMap(s => [s.id, s.value]))
    .run();
    
    // 成功後再一次全部 ack
    batch.ackAll();
  }
}

06. 最佳實踐:Dead Letter Queues (DLQ)

永遠不要假設任務會 100% 成功。 如果 Email 地址錯誤,或者第三方 API 掛了,你的 Consumer 會一直失敗、一直重試 (Retry Loop)。

Dead Letter Queue (死信佇列) 是這些失敗訊息的墳場。 當訊息重試超過 max_retries (例如 3 次) 後,Cloudflare 會自動把它移到 DLQ。 你應該另外寫一個 Worker 或腳本來監控 DLQ,分析為什麼這些任務會失敗,或者手動修復後重新發送。

07. 小結與下一步

有了 Queues,你的系統具備了:

  1. 高響應性:使用者不用等背景任務。
  2. 高可靠性:失敗會自動重試。
  3. 高吞吐量:透過 Batching 優化資料庫寫入。

這就是微服務架構 (Microservices) 的雛形。你的系統不再是一塊大單體 (Monolith),而是由多個專注的小 Worker 組成的艦隊。

在下一篇 Part 9: 智慧篇,我們將進入最令人興奮的領域 —— AI。 你不需要買昂貴的 GPU,也不需要學 Python。我們將直接在 Worker 中呼叫 Workers AI,免費運行 Llama 3 語言模型與 Stable Diffusion 繪圖模型。


Ken Huang

關於作者

Ken Huang

熱衷於嘗試各類新技術的軟體開發者,現階段主力為 Android / iOS 雙平台開發,同時持續深耕前端與後端技術,以成為全端工程師與軟體架構師為目標。

最廣為人知的代表作為 BePTT。開發哲學是「以做身體健康為前提」,致力於在工作與生活平衡的基礎上,打造出擁有絕佳使用體驗的軟體服務。

這裡是用於紀錄與分享開發經驗的空間,希望能透過我的實戰筆記,幫助開發者解決疑難雜症並提升效率。

Android APP DevelopmentiOS APP DevelopmentBePTT CreatorFull Stack Learner