intra-mart Accel Platform 非同期プログラミングガイド 第4版 2021-08-01

タスクキューの管理

タスクキューは、一時的にタスクメッセージを蓄えておく場所です。

タスクキューに保存されたタスクメッセージは、タスク処理サービスによって、以下の条件に合うものから順に取得されて処理されます。

  • タスク実行エンジン では新しい タスクメッセージ を受け付けて タスク を実行する余裕がある。

  • タスクキュー に以下のいずれかの タスクメッセージ が存在する。

    • 並列タスクキュー の先頭に選択待機状態となっている タスクメッセージ が登録されている。
      (ただし、並列タスクキューが有効状態であることが前提)
    • 直列タスクキュー の先頭に選択待機状態となっている タスクメッセージ が登録されていて、同 タスクキュー 内で以下の状態に該当する タスク が存在しない。
      (ただし、直列タスクキューが有効状態であることが前提)
      • 初期状態
      • 受付待機状態
      • 処理実行可能状態
      • 処理実行中状態

上記の条件に一致する タスクメッセージ が複数ある場合、 タスクメッセージ の受信日時が一番古いものが優先されます。 受信日時が同一の タスクメッセージ が複数存在する場合、優先される タスクメッセージ は不定です。

この様子をタスクの選択に図示します。

Task selection

タスクの選択

タスクメッセージ登録処理も参照してください。

本章では、タスクキュー自体の管理方法について説明します。

並列タスクキューの有効化/無効化

並列タスクキューは任意の時点で以下の状態にすることが可能です。

  • 並列タスクキュー内で待機中のタスクメッセージに対する処理の実行が行われている状態です。
  • 並列タスクキュー内で待機中のタスクメッセージに対する処理の実行を一時停止している状態です。

並列タスクキューの状態を変更するAPIとして並列タスクキューの状態を変更するAPIが用意されています。 詳細は非同期処理機能の仕様書を参照してください。

並列タスクキューの状態を変更するAPI
言語 API 備考
Java TaskManager#setParallelizedTaskQueueActive(boolean active)
  • active:
    • true: 有効状態にする
    • false: 無効状態にする
サーバサイドJavaScript WorkManager#setParallelizedTaskQueueActive(active)
  • active:
    • true: 有効状態にする
    • false: 無効状態にする

有効状態

有効状態となっている並列タスクキューでは、先頭で待機中のタスクメッセージは常にタスクとして実行される候補となりえます。

無効状態

無効状態となっている並列タスクキューでは、待機中のタスクメッセージはタスクとして実行される候補から外されます。 ただし、ビジネスロジックを実行中のタスクについてはそのまま処理が続行されます。

注意

並列タスクキューを無効状態にする場合は十分注意してください。

並列タスクキューはテナント毎に1つだけ存在します。 そのため、並列タスクキューを無効状態にすると、並列処理機能として登録されたすべてのタスクメッセージの処理が停止されます。

直列タスクキューの有効化/無効化

直列タスクキューは複数存在します。 直列タスクキューの追加または削除は非同期処理機能の管理者によって行われます。

直列タスクキューは任意の時点で以下の状態にすることが可能です。

  • 直列タスクキュー内で待機中のタスクメッセージに対する処理の実行が行われている状態です。
  • 直列タスクキュー内で待機中のタスクメッセージに対する処理の実行を一時停止している状態です。

直列タスクキューの状態を変更するAPIとして並列タスクキューの状態を変更するAPIが用意されています。 詳細は非同期処理機能の仕様書を参照してください。

並列タスクキューの状態を変更するAPI
言語 API 備考
Java TaskManager#setSerializedTaskQueueActive(String queueId, boolean active)
  • queueId: キューID
  • active:
    • true: 有効状態にする
    • false: 無効状態にする
サーバサイドJavaScript WorkManager#setSerializedTaskQueueActive(queueId, active)
  • queueId: キューID
  • active:
    • true: 有効状態にする
    • false: 無効状態にする

有効状態

有効状態となっている直列タスクキューに実行中のタスクが存在しない場合、その直列タスクキューの先頭で待機中のタスクメッセージは常にタスクとして実行される候補となりえます。

無効状態

無効状態となっている直列タスクキューでは、待機中のタスクメッセージはタスクとして実行される候補から外されます。 ただし、ビジネスロジックを実行中のタスクについてはそのまま処理が続行されます。

直列タスクキューを削除するための準備として直列タスクキューを無効状態にするケースがあります。 上記については直列タスクキュー の削除で改めて説明します。

コラム

直列タスクキューを無効状態にした場合、影響を受ける(処理が停止される)タスクメッセージは該当するキューIDの直列タスクキューのものに限定されます。 他の直列タスクキューに所属するタスクメッセージやタスクには影響ありません。

直列タスクキューの登録

直列タスクキューは非同期処理機能の管理者が用意する必要があります。

初期状態の非同期処理機能では、直列タスクキューは用意されていません。

直列タスクキューを登録するAPIとして直列タスクキューを登録するAPIが用意されています。 詳細は非同期処理機能の仕様書を参照してください。

直列タスクキューを登録するAPI
言語 API 備考
Java TaskManager#addSerializedTaskQueue(String queueId, boolean active)
  • queueId: キューID
  • active:
    • true: 登録直後の状態を有効状態にする
    • false: 登録直後の状態を無効状態にする
サーバサイドJavaScript WorkManager#addSerializedTaskQueue(String queueId, boolean active)
  • queueId: キューID
  • active:
    • true: 登録直後の状態を有効状態にする
    • false: 登録直後の状態を無効状態にする

登録時に該当する直列タスクキューを有効状態または無効状態のいずれかの状態にしておくことが可能です。

  • タスクメッセージを処理する環境が用意されていて、直列タスクキューの登録時した時点で処理を開始したい場合は有効状態として登録してください。
  • 直列タスクキューの登録だけ先に行い、タスクメッセージの処理そのものはまだ実行させたくない場合は無効状態として登録してください。

直列タスクキュー の削除

不要となった直列タスクキューは削除することが可能です。

直列タスクキューを削除するAPIとして直列タスクキューを削除するAPIが用意されています。 詳細は非同期処理機能の仕様書を参照してください。

直列タスクキューを削除するAPI
言語 API 備考
Java TaskManager#removeSerializedTaskQueue(String queueId)
  • queueId: キューID
サーバサイドJavaScript WorkManager#removeSerializedTaskQueue(String queueId)
  • queueId: キューID

直列タスクキューを削除する場合、待機中のタスクメッセージおよびビジネスロジックを実行中のタスクが含まれていないことが前提です。

直列タスクキューの内容を空にする確実な方法はありませんが、以下の様にある程度手順化することは可能です。

  1. 該当する直列タスクキューを無効状態にします。

  2. 該当する直列タスクキューに新しいタスクメッセージが追加されないようにアプリケーション側で制御します。

    注意

    直列タスクキューが無効状態であってもタスクメッセージは登録できます。

  3. 現在処理中のタスクはビジネスロジックが完了するまで待機するか、終了通知を行って管理対象外にします。

  4. 待機中のタスクメッセージをすべて削除します。

この手順の実装例を直列タスクキューの削除(Java)および直列タスクキューの削除(サーバサイドJavaScript)に示します。

直列タスクキューの削除(Java)

package sample.mytest;

import jp.co.intra_mart.foundation.asynchronous.InvalidTaskException;
import jp.co.intra_mart.foundation.asynchronous.TaskControlException;
import jp.co.intra_mart.foundation.asynchronous.TaskIllegalStateException;
import jp.co.intra_mart.foundation.asynchronous.TaskManager;
import jp.co.intra_mart.foundation.asynchronous.TaskQueueIllegalStateException;
import jp.co.intra_mart.foundation.asynchronous.report.RegisteredInfo;
import jp.co.intra_mart.foundation.asynchronous.report.RegisteredSerializedTaskInfo;
import jp.co.intra_mart.foundation.asynchronous.report.RegisteredSerializedTaskQueueInfo;

public class SerializedTaskQueueSample {
   ...

   private void removeQueue(String queueId)
         throws TaskQueueIllegalStateException, TaskControlException {

      // キューを無効化
      TaskManager.setSerializedTaskQueueActive(queueId, false);

      // タスクメッセージが新規追加されないように制御
      ...

      // すべて直列タスクキュー情報を取得
      RegisteredInfo registeredInfo = TaskManager.getRegisteredInfo();
      RegisteredSerializedTaskQueueInfo queueInfo =
            registeredInfo.getSerializedTaskQueuesInfo().get(queueId);

      // 処理実行中のタスクに対して終了通知
      RegisteredSerializedTaskInfo runningTaskInfo = queueInfo.getRunningTaskInfo();
      if (runningTaskInfo != null) {
         String runningMessageId = runningTaskInfo.getMessageId();
         TaskManager.releaseRunningParallelizedTask(runningMessageId, false, true);
      }

      // 待機中のタスクメッセージをすべて削除
      for (RegisteredSerializedTaskInfo task : queueInfo.getWaitingTasksInfo()) {
         String messageId = task.getMessageId();
         try {
            TaskManager.removeSerializedTask(messageId);
         } catch (TaskIllegalStateException e) {
            ...
         } catch (InvalidTaskException e) {
            ...
         }
      }

      TaskManager.removeSerializedTaskQueue(queueId);
   }
   ...

}

コラム

  • 直列タスクキューの削除(Java)ではコードの見やすさを優先しているため、例外処理の詳細については省略しています。

  • intra-mart Accel Platform 2017 Summer(Quadra) 以降をご利用の場合、下記のように、指定のキューのみを取得して直列タスクキューを削除することもできます。

    // 指定した直列タスクキュー情報を取得
    RegisteredSerializedTaskQueueStatus queueInfo = TaskManager.getSerializedTaskQueuesStatusById(queueId);
    
    if (queueInfo != null) {
       // 処理実行中のタスクに対して終了通知
       RegisteredSerializedTaskInfo runningTaskInfo = queueInfo.getRunningTasksInfo();
       if (runningTaskInfo != null) {
          String runningMessageId = runningTaskInfo.getMessageId();
          TaskManager.releaseRunningParallelizedTask(runningMessageId, false, true);
       }
    
       // 待機中のタスクメッセージをすべて削除
       SearchRegisteredSerializedTaskInfo waitingQueueInfo = TaskManager.getSerializedTaskInfo(queueId, null, null, 1, -1);
       for (RegisteredSerializedTaskInfo task : waitingQueueInfo.getTaskInfo()) {
          String messageId = task.getMessageId();
          try {
             TaskManager.removeSerializedTask(messageId);
          } catch (TaskIllegalStateException e) {
             ...
          } catch (InvalidTaskException e) {
             ...
          }
       }
    }
    

直列タスクキューの削除(サーバサイドJavaScript)

function removeQueue(String queueId) {

   let workManager = new WorkManager();

   // キューを無効化
   workManager.setSerializedTaskQueueActive(queueId, false);

   // タスクメッセージが新規追加されないように制御
   ...

   // すべて直列タスクキュー情報を取得
   let registeredInfo = workManager.getRegisteredInfo().data;
   let queueInfo = registeredInfo.serializedTasksInfo[queueId];

   // 処理実行中のタスクに対して終了通知
   let runningTaskInfo = queueInfo.runningTaskInfo;
   if (runningTaskInfo != null) {
      let runningMessageId = runningTask.messageId;
      workManager.releaseRunningParallelizedTask(runningMessageId, false, true);
   }

   // 待機中のタスクメッセージをすべて削除
   for (let task in queueInfo.waitingTasks) {
      let messageId = task.messageId;
      let result = workManager.removeSerializedTask(messageId);
      if (!result.error) {
         ...
      }
   }

   workManager.removeSerializedTaskQueue(queueId);
}
...

コラム

  • 直列タスクキューの削除(サーバサイドJavaScript)ではコードの見やすさを優先しているため、例外処理の詳細については省略しています。

  • intra-mart Accel Platform 2017 Summer(Quadra) 以降をご利用の場合、下記のように、指定のキューのみを取得して直列タスクキューを削除することもできます。

    // 指定した直列タスクキュー情報を取得
    let queueStatus = workManager.getSerializedTaskQueuesStatusById(queueId).data;
    
    // 処理実行中のタスクに対して終了通知
    if (queueStatus != null) {
       if (!isNull(queueStatus.serializedTaskInfo)) {
          let runningMessageId = queueStatus.serializedTaskInfo.messageId;
          workManager.releaseRunningParallelizedTask(runningMessageId, false, true);
       }
    
       // 待機中のタスクメッセージをすべて削除
       let waitingQueueInfo = workManager.getSerializedTaskInfo(queueId, null, null, 1, -1).data;
       let waitingTasksInfo = waitingQueueInfo.serializedTaskInfo;
       for (let i = 0; i < waitingTasksInfo.length; i++) {
          let messageId = waitingTaskInfo[i].messageId;
          let result = workManager.removeSerializedTask(messageId);
          if (!result.error) {
             ...
          }
       }
    
       workManager.removeSerializedTaskQueue(queueId);
    }