Node.js で排他制御。async-lock を使ってみた

Express サーバのとあるリクエストについて、同時にリクエストがあっても順に処理する必要が出た。すなわち、排他制御をかけ、同時に処理が行われないようにしたかったのだ。

それを実現してくれる async-lock というライブラリを見つけたので、使い方を紹介する。

async-lock をインストールする

今回は TypeScript ベースで Express サーバを作っていて、そこに async-lock を追加するテイで紹介するので、Definitely Typed パッケージもインストールしておく。

$ npm init -y
$ npm install --save express
$ npm install --save-dev typescript ts-node @types/express

# async-lock をインストールする
$ npm install --save async-lock
$ npm install --save-dev @types/async-lock

async-lock を使ってみる

それでは async-lock を使ったリクエスト処理を実装してみよう。

http://localhost:8000/ にリクエストが来たら、1つずつ順にカウンタを回し、その値をテキストファイルに書き出す、という API を作ってみる。この際、テキストファイルへの書き込みが同時に発生しないよう、async-lock を使用して排他制御をかける、というワケ。

import fs from 'fs';
import express from 'express';
import AsyncLock from 'async-lock';

// サンプル用の関数
  // 指定秒数だけ待機する
  const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
  // 現在の分・秒・ミリ秒を返す
  const now = () => new Date().toISOString().substr(14, 9);

// グローバルにカウントしたいデータがあるとする
let counter = 0;

// ロック用変数を定義する・タイムアウトを30秒に設定しておく
const lock = new AsyncLock({ timeout: 1000 * 30 });

// サーバ
const app = express();

app.get('/', async (req, res) => {
  console.log(now(), '[/] Start');
  
  // 排他制御したい処理に 'my-lock' という名前を付ける
  lock.acquire('my-lock', async () => {
    console.log(now(), '  Lock Function Start');
    
    counter++;  // カウンタをインクリメントする
    let text = `${counter}`;
    
    await sleep(3000);  // 適当に待機させる
    console.log(now(), '    Counter : ', counter);
    
    // テキストファイルに追記する
    if(fs.existsSync('./counts.txt')) {
      const original = fs.readFileSync('./counts.txt', 'utf-8');
      text = `${original}\n${text}`;
    }
    fs.writeFileSync('./counts.txt', text, 'utf-8');
    
    console.log(now(), '  Lock Function End');
    return 'Successful';
  }, (error, result) => {
    console.log(now(), '  Lock Result Start');
    if(error) {
      console.log(now(), '    Failure : ', error);
      res.send(`${now()} : NG`);
    }
    else {
      console.log(now(), '    Success : ', result);
      res.send(`${now()} : OK`);
    }
    console.log(now(), '  Lock Result End');
  });
  
  console.log(now(), '[/] End');
});

app.listen(8000, () => {
  console.log('Server Started');
});

通常は res.send() 部分を return res.send() とし、以降の行を処理しないようにすると思うが、今回は動作を把握するため return はせずに、最後の [/] End の行まで実行させてみる。

動作を確認する

それではサーバを起動し、動作を確認してみよう。

$ npx ts-node app.ts
Server Started

この状態で、他に2つほどターミナルタブを開き、この Express サーバに対してリクエストを連続で投げてみる。

# タブ 1 とタブ 2 から、連続して次のような curl を叩く
$ curl http://localhost:8000/

するとサーバ側のログが次のように出力されていく。それぞれ解説コメントを間に入れていく。

# タブ 1 からの curl が処理される
25:57.500 [/] Start
25:57.501   Lock Function Start
25:57.502 [/] End
  # sleep() 関数により後続処理が3秒ほど止まっている

# その間に、タブ 2 からの curl リクエストを受け付ける
25:58.144 [/] Start
25:58.145 [/] End
  # しかし、async-lock 内の関数はまだ実行されていない

# 3秒経過し、タブ 1 からの curl 処理の続きが開始される
26:00.502     Counter :  1
26:00.504   Lock Function End
26:00.504   Lock Result Start
26:00.504     Success :  Successful
26:00.508   Lock Result End
# ↑ ココまででタブ 1 へのレスポンスが完了する

# ↓ その直後、タブ 2 からのリクエストに対する async-lock の処理が動き始める (上のログ行と同じ時刻 = すぐ直後に動作していることが分かる)
26:00.508   Lock Function Start
  # 3秒待機
26:03.509     Counter :  2
26:03.510   Lock Function End
26:03.510   Lock Result Start
26:03.510     Success :  Successful
26:03.510   Lock Result End
# タブ 2 へのレスポンスが完了する

このように、後から来たリクエストに対する処理について、async-lock が制御をかけて、同時に実行されないように制御してくれていることが分かる。

それぞれのターミナルタブのレスポンスは次のようになっている。

# タブ 1 の様子 … リクエストして3秒程度でレスポンスされる
$ curl http://localhost:8000/
26:00.504 : OK

# タブ 2 の様子 … リクエストしてから5・6秒近く待たされている (タブ 1 の処理が完了するまで待っているため)
$ curl http://localhost:8000/
26:03.510 : OK

タブ 1 とタブ 2 とで、1秒以内に連続してリクエストしたが、タブ 2 の方のレスポンスは排他制御している分待たされたことが分かる。

エラーハンドリング

というワケで、async-lock によるロックの掛け方が分かった。

ついでに、ロックしたい処理の中で例外が発生した場合の動作も紹介しておこう。

app.get('/', async (req, res) => {
  lock.acquire('my-lock', async () => {
    // 読み込みたいファイルが存在せず、ココでエラーがスローされるとする
    const text = fs.readFileSync('./NOT-EXISTS.txt', 'utf-8');
    
    // ファイルが存在する場合は、その内容を return するテイ
    return text;
  }, (error, result) => {
    if(error) {
      // エラーがスローされると、変数 error にエラーオブジェクトが格納されるので、ココでハンドリングする
      return res.send('NG');
    }
    
    // 成功時は、上の関数内で return した変数 text の内容が、変数 result として受け取れる
    return res.send(result);
  });
});

lock.acquire() の第2引数に渡している関数が、排他制御したい処理をラップした関数。この中の fs.readFileSync() が失敗するとエラーがスローされるが、try / catch などは用意していない。

その代わりに、lock.acquire() の第3引数に渡した関数に処理が移り、ココの変数 error で、エラーをキャッチできるようになっている。

要するに、Node.js コア API のコールバック関数なんかと同じ作りである。

lock.acquire(lockName, asyncFunction, callbackFunction);

正常終了時は、asyncFunctionreturn した値が、callbackFunction における変数 result となっているので、そちらで res.send() なりを呼んでやると良いだろう。

というワケで、try / catch を書かずに済むので、ネストが少なく済むのが嬉しい。