最終更新: a few seconds agoRemove Netlify-related code from main (grafted, HEAD)

「タスク」の実装

概要

CIRCUS-API が実行する処理の中には、実行するのにかなりの時間がかかるものがある。

  • シリーズの ZIP ファイルをアップロードした後にそれを MongoDB に取り込む処理
  • Case のエクスポート(DICOM 画像を匿名化して ZIP ファイルに固める処理)

現時点ではこのような時間がかかる処理について進行状況(プログレス)を表示する仕組みがない。例えばシリーズを POST /api/series でインポートする際は、ネットワーク経由のアップロード自体のプログレスは XHR で表示できるが、アップロードが終わった後のインポートの経過(つまりサーバ側での処理の経過)は表示されず、POST リクエストがレスポンスを返さずに止まったままになる。

一般論として、特殊な場合を除き HTTP リクエストはすぐに終了すべきである。処理が 10 秒程度で終わるならまだ良いが、数分かかるような処理の場合はリクエストがタイムアウトする可能性すらある。

これを改善するため、このような時間がかかる処理を「タスク」と呼び、一元的に綺麗に処理するための仕組みを構築する。

  • 数秒以上の時間がかかるかもしれない処理が始まった時、taskId 付きの「タスク」が発行される。
  • 時間がかかる処理を行う API は、本当の処理が終わる数秒~数分後に 200 で { status: 'OK' } のようなものを返すのではなく、代わりに処理の開始直後に { taskId: '2zj32083' } のような形のレスポンスを返し、そこで一旦HTTP リクエストを終了する。
  • タスクには、終了時にその結果をダウンロードできるもの(例:Case のエクスポート)と、そのような結果のダウンロードが必要ないもの(例:シリーズのインポート)がある。
  • タスクには以下の情報が含まれる。
    • taskId (ULID)
    • userEmail: タスクを開始したユーザ(タスク一覧で検索するときにも使う)
    • name: タスク表示名(例 "Importing DICOM files")
    • status: 正常終了 ("finished")、異常終了 ("error")、実行中 ("processing") の別
    • downloadable: タスク完了後にその結果をダウンロードできる場合、その MIME type。ダウンロードできない場合は null。
    • createdAt, updatedAt
    • ★ 実行中の場合はプログレスバーを表示するに足るだけの進行状況(数字 "12/150" とテキスト "Importing" の両方)
  • このうち ★ で示した実行状況は MongoDB 内に保持せず、実行中の circus-api のメモリ内にのみ保持される。その他の状況は MongoDB 内の tasks というコレクションに保持することにする。
  • タスクの現在の状況は、処理をリクエストしたのとは別の API エンドポイント /api/tasks/:taskId を用いて通知してもらう。これは server-sent events を実装して行う。

TaskManager について

上記を実現するために TaskManager と呼ばれるタスク処理のためのサービスを FunctionService を使って構築する。これは以下の 3 つのメソッドを持つサービス。

  • register: エンドポイントから ctx を受け取って以下の処理を行う。
    • 新たな taskId を ULID で生成して、MongoDB にその情報を保存する ({ status: 'processing' })。
    • このタスクの進行状況を API エンドポイント側から通知してもらうための EventEmitter を作って返す。
    • downloadable の場合は追加でそのダウンロードファイルを書き込むための WriteStream も作って返す。
    • ctx に taskId を出力して、リクエストを正常終了させる。
  • report: エンドポイントから ctx を受け取って処理の状況を server-sent events (SSE) を使って通知する。
  • isTaskInProgress: このタスクマネージャが把握している進行中のタスクがあるかどうかを返す。これはタスクの実行途中に circus-api が s 強制終了した場合などに必要な処理。後述。
interface TaskManager {
  register: (
    ctx: koa.Context,
    options: {
      name: string;
      userEmail: string;
      downloadable?: string; // mime type if downloadable
    }
  ) => Promise<{ emitter: EventEmitter; downloadFile?: WriteStream }>;
  report: (ctx: koa.Context, taskId: string) => void;
  isTaskInProgress: (taskId: string) => boolean;
}

interface Task {
  finished: number;
  total: number;
  message: string;
  emitter: EventEmitter;
}

interface Options {
  timeoutMs: number;
}

const createTaskManager: FunctionService<TaskManager> = (
  options: Options,
  deps: { models: Models }
) => {
  // In-memory storage of ongoing tasks
  const tasks = new Map<string, Task>(); // maps taskId to Task
  return {
    /* ... */
  };
};

createTaskManager.dependencies = ['models'];

タスクを実装する API 側の taskManager 使用例はこちら(ダウンロードファイルがある場合)。

// API側の使用例(Koaのミドルウェア内)
async (ctx, next) => {
  const { emitter, downloadFile } = await taskManager.register(ctx, {
    name: 'Case export',
    userEmail: ctx.user.userEmail,
    downloadable: 'text/plain'
  });

  async(() => {
    // ここから下の処理はHTTPリクエストが既に終了した状態で実行される
    emitter.emit('processing', 5, 10);
    downloadFile.write('This is the ');
    emitter.emit('processing', 10, 10);
    downloadable.end('downloadable content');
    emitter.emit('finish');
  })();
};

タスク処理シーケンス

sequenceDiagram participant Browser participant API as APIエンドポイント participant TaskManager participant DB as MongoDB Browser ->> API: 時間のかかる処理を要求 API ->> TaskManager: registerでctxの委譲 TaskManager ->> DB: タスク情報の書き込み TaskManager ->> API: EventEmitterを渡す activate API TaskManager ->> Browser: taskIdを通知してリクエスト終了 Note over API: タスク本体処理実行 Note over API: 必要に応じて
ダウンロード用
ファイル生成 loop ループ API ->> TaskManager: タスク進行状況を通知 end API ->> TaskManager: タスク終了/失敗を通知 deactivate API TaskManager ->> DB: 終了/失敗情報を書き込み opt 既存タスク検索 Browser ->> API: タスク一覧検索 API ->> DB: クエリ DB ->> API: 結果返却 API ->> TaskManager: isTaskInProgress で本当に実行中か確認 opt タスク中断の場合の処理 API ->> DB: 実行中でないなら error を書き込み end API ->> Browser: taskId 検索結果返却 Note over Browser: ブラウザ画面に
検索結果表示 end opt タスク処理状況表示 Browser ->> API: taskId で状況問い合わせ API ->> TaskManager: reportでctxを委譲 alt TaskManagerが実行中と知っているIDの場合 TaskManager ->> Browser: SSEで状況通知 else TaskManagerが実行中と知らないIDの場合 TaskManager ->> DB: タスク情報問い合わせ DB ->> TaskManager: 結果返却 opt タスク強制中断の場合 TaskManager ->> DB: エラーを書き込み end TaskManager ->> Browser: 終了/エラーの別を通知 end Note over Browser: ブラウザ画面に
状況表示 end

ちなみに「タスク」は「ジョブ」と呼んでもいいのだが、CIRCUS CS の方のプラグインジョブと名前が被るので「タスク」と呼ぶ。

EventEmitter を使ったイベント通知

API エンドポイントと TaskManager との間でのイベント進行状況の通知は以下の EventEmitter を使って行う。

strict-event-emitter-types (か、類似のもの)を利用する。

import StrictEventEmitter from 'strict-event-emitter-types';

interface Events {
  // 実行状況の通知。例えば 50% 完了なら .emit('progress', 'Importing', 12, 24)
  progress: (message: string, value?: number, max?: number) => void;
  // エラーの通知。エラーメッセージ付き。
  error: (message: string) => void;
  // 終了の通知。終了時メッセージ付き。
  finish: (message: string) => void;
}

const emitter: StrictEventEmitter<Events> = new EventEmitter();

エラー処理について

以下の 2 種類のエラーを正しく処理すること。

API エンドポイント側が出したエラー

例えばシリーズのインポート途中におかしな DICOM ファイルがありそれ以上インポート処理が進められなくなった場合。これは EventEmitter に 'error' イベントを emit し、それを TaskManager が補足することで処理する。TaskManager は DB に { status: 'error' } を書き込んで、処理中リストから忘れる。

circus-api の強制終了などにともなうエラー

実行中のタスクは常に TaskManager がメモリ内で把握している。もし例えばシリーズのインポート真っ最中に Ctrl + C が押されるなどして circus-api が強制終了した場合、MongoDB には { status: 'processing' } の情報が残るが、タスクは中断されてしまっており、次に circus-api が起動した際に再開することもできない。このような場合に対処するために isTaskInProgress のメソッドがある。

タスク一覧検索時には MongoDB 内のタスク情報が { status: 'processing' } であっても本当は処理が中断されてしまっている可能性があるため、それぞれの taskId に対して isTaskInProgress でタスクが本当に実行中かどうかを確認すること。

タスク状況の report 時にも、タスクが本当に実行中であればメモリ内にその状況が保持されているはず。そうでない場合、「正常終了後」「異常終了後」のほかに、「データベースには processing と書いてあるけど本当は中断されて終わっている」という状況があることを考慮して実装すること。

追加で実装するエンドポイント

circus-api/src/api/tasks 内に以下のエンドポイントを実装する。適宜テストも書く。

  • GET /api/tasks/:taskId: 自分が発行したタスクであるかどうかを userEmail ベースで確認した後に、指定されたタスクの詳細を返す。

  • GET /api/tasks: 自分が発行したタスクの一覧を performSearch の形式で返す。

  • GET /api/tasks/:taskId/download: 自分が発行したタスクであり、かつ処理が完了しており、かつダウンロードできるファイルがある場合は、そのファイルをダウンロードさせる。

スロットル・デバウンスについて

EventEmitter は 'progress' イベントを短時間に何度も呼ぶ可能性があるが、そのたびにネットワークに SSE でイベントを発行していたら負荷が高くなりすぎる。このためにスロットルという技術を使って、SSE でのイベントを適宜間引きするようにする。ひとまず間隔は 300ms 秒に 1 回くらいの固定で良い。

覚えること

  • 改めてクロージャやストリームについての深い理解
  • EventEmitter の使い方、特に TypeScript での型の付け方
  • debounce, throttle という概念について

できればこれも

  • 💪 クライアント側の実装もやってテストをしてみる。Webpack とかは要らないので非常に簡単な React アプリを作って実際にこれがどう動くかを確かめると良い。