intra-mart Accel Platform 非同期プログラミングガイド 第4版 2021-08-01

タスクの実装

タスクは非同期処理機能において実際にビジネスロジックを実行する部分です。

本章ではいくつかの断片的なサンプルを交えながらタスクの実装方法と注意点について説明します。

各メソッドまたは関数の実装

タスクを実装するためには、さまざまなメソッドや関数を定義する必要があります。

本節ではそれぞれの関数の役割とその実装内容について説明します。

タスクのメソッド/関数の一覧もあわせて参照してください。

タスクのメソッド/関数の一覧
項目 Java (Taskインタフェース) Java (AbstractTask抽象クラス) サーバサイドJavaScript
パラメータ設定 setParameter
setParameter
getParameter
setParameter
ビジネスロジックの実行 run run run
終了通知 release release
処理実行可能状態の通知 taskAccepted taskAccepted taskAccepted
処理実行中状態の通知 taskStarted taskStarted taskStarted
ビジネスロジック終了の通知 taskCompleted taskCompleted taskCompleted
タスクの受付拒否 taskRejected taskRejected taskRejected

メソッド共通事項/関数共通事項

ここでは各メソッドおよび関数の実装において共通することを記載します。

リソース

それぞれのメソッドや関数内で解放が必須となるリソース(ファイルの入出力やデータベースの接続など)を取得した場合、各メソッドおよび関数を終了する時には必ずリソースの解放を行ってください。 その理由のいくつかは以下の通りです。

  • 各メソッドや関数は異なるスレッド上で実行される場合があります。スレッドにリソースが関連付けられている場合、リソース取得時のスレッドとは異なるスレッドでリソースを解放しようとすると不具合が発生することが考えられます。

  • 各イベントに該当するメソッドや関数は、実行状況によってはすべてが必ず呼び出されるとは限りません。
    (例)taskRejectedが呼び出された場合、runtaskCompletedは呼び出されません。

コラム

各メソッドや関数は異なるスレッド上で実行される場合がありますが、Javaの終了通知を除けば、複数のメソッドや関数が同時に実行されることはありません。 終了通知のみが、その他のメソッドと同時に実行される場合があります。

Java

Javaを利用して、ビジネスロジックとして動作するタスクを実現するためには、jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを実装する必要があります。

Task インタフェースの実装方法として、以下の様な方法があります。

  • Taskインタフェースを直接implementsする方法

    jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを実装してタスクを作成する場合、すべてのメソッドを実装する必要があります。

    この方法を採用する場合、ビジネスロジックに不要なメソッドを実装する必要もあるため、特別な理由がない限り、次の方法で実装することを推奨します。

  • AbstractTask抽象クラスをextendsする方法

    jp.co.intra_mart.foundation.asynchronous.AbstractTask抽象クラスを利用すると、必要なメソッドのみをオーバーライドすればタスクを作成することができます。

    この方法を採用した場合、Taskインタフェースを直接実装する方法に比べると以下の様なメリットがあります。

    • 不要なメソッドのオーバーライドをする必要がありません。
    • ビジネスロジック実行時に必要となるパラメータの受け渡しが簡略化できます。

特別な理由がない限り、AbstractTask抽象クラスを拡張してタスクを作成する方法を推奨します。

Javaを利用してタスクを実装するときのメソッドとしてJavaを利用するタスクも参照してください。

Javaを利用するタスク
メソッド Task インタフェース AbstractTask 抽象クラス 備考
setParameter オーバーライド必須 オーバーライド任意
getParameter (存在しない) オーバーライド任意 setParameter をオーバライドした場合、その内容に応じて修正が必要
run オーバーライド必須 オーバーライド任意 ビジネスロジックを実装する場合はオーバーライド必須
release オーバーライド必須 オーバーライド任意 外部から安全に終了させる場合はオーバーライド必須
taskAccepted オーバーライド必須 オーバーライド任意
taskStarted オーバーライド必須 オーバーライド任意
taskCompleted オーバーライド必須 オーバーライド任意
taskRejected オーバーライド必須 オーバーライド任意

サーバサイドJavaScript

サーバサイドJavaScriptを利用して、ビジネスロジックとして動作するタスクを実現するためには、特定の関数群を実装したJSファイルを作成する必要があります。

非同期処理機能から利用される関数は複数ありますが、実際には必要となる関数のみ実装すれば問題ありません。 run関数のみ実装が必須ですが、その他の関数の実装は任意です。

サーバサイドJavaScriptを利用してタスクを実装するときの関数としてサーバサイドJavaScriptを利用するタスクも参照してください。

サーバサイドJavaScriptを利用するタスク
関数 実装の必須/任意 備考
setParameter 任意
run 必須
taskAccepted 任意
taskStarted 任意
taskCompleted 任意
taskRejected 任意

パラメータ設定

タスクメッセージ登録時に、ビジネスロジック内で使用する値を「パラメータ」として設定することができます。

タスクが生成されると、実際のビジネスロジックの実行に先立ってパラメータがタスクに受け渡されます。

パラメータ設定を参照してください。

パラメータ設定
開発モデル メソッド/関数 備考
Java setParameter AbstractTask抽象クラスでは独自の実装がされています。
Java getParameter

AbstractTask抽象クラスに追加されています。setParameterで設定されたパラメータを取得できます。

setParameterメソッドをオーバライドしていない限り、setParameterで設定されたパラメータをこのメソッドで取得することが可能です。

setParameterメソッドが非同期処理機能から呼び出される前にこのメソッドを呼んだ場合、戻り値としてnullが返されます。

サーバサイドJavaScript setParameter

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、setParameterメソッドを独自に開発する必要があります。

setParameterメソッドに渡されたパラメータの使用方法について、特に制限はありません。 一例として、パラメータから取得できる情報をインスタンス変数に保存し、ビジネスロジック実行時(runメソッド実行時)に利用するなどの方法が考えられます。

package sample.task;

import java.util.Map;
import jp.co.intra_mart.foundation.asynchronous.Task;

public class MySampleTask implements Task {

   private String firstName = null;
   private String lastName = null;
   private int age = 0;

   @Override
   public void setParameter(Map<String, ?> parameter) {
      firstName = (String) parameter.get("FIRST_NAME");
      lastName = (String) parameter.get("LAST_NAME");
      age = ((Number) parameter.get("AGE")).intValue();
   }

   @Override
   public void run() {
      // firstName, lastName, ageを使って処理を実行
      ...
   }

   ...
}

jp.co.intra_mart.foundation.asynchronous.AbstractTask抽象クラスでは、このメソッドは引数に渡されたパラメータを内部で保存します。 この場合、内部に保存されたパラメータは getParameter メソッドで取得することが可能です。

package sample.task;

import java.util.Map;
import jp.co.intra_mart.foundation.asynchronous.AbstractTask;

public class MySampleTask extends AbstractTask {

   @Override
   public void run() {
      Map<String, ?> parameter = getParameter();
      String firstName = (String) parameter.get("FIRST_NAME");
      String lastName = (String) parameter.get("LAST_NAME");
      int age = ((Number) parameter.get("AGE")).intValue();

      // firstName, lastName, ageを使って処理を実行
      ...
   }

   ...
}

サーバサイドJavaScript

setParameter関数に渡されたパラメータの使用方法について、特に制限はありません。 一例として、パラメータから取得できる情報を変数に保存し、ビジネスロジック実行時(run関数実行時)に利用するなどの方法が考えられます。

var firstName;
var lastName;
var age;

function setParameter(parameter) {
   firstName = parameter.firstName;
   lastName = parameter.lastName;
   age = parameter.age;
}

function run() {
   // firstName, lastName, ageを使って処理を実行
   ...
}

ビジネスロジックの実行

実際に非同期で行う処理を記述します。

ビジネスロジックの実行
開発モデル メソッド/関数 備考
Java run AbstractTask抽象クラスではこのメソッドは何もしません。
サーバサイドJavaScript run

runメソッドまたはrun関数の実装例として、サンプルも参照してください。

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、runメソッドを独自に開発する必要があります。

AbstractTask 抽象クラスでは、このメソッドは何もしません。 必要に応じてこのメソッドをオーバライドし、ビジネスロジックを実装してください。

コラム

releaseメソッドが呼ばれたら、できるだけ速やかにrunメソッドの実行を終了するような実装をしてください。 詳細については非同期処理機能の仕様書を参照してください。

サーバサイドJavaScript

JSファイル上にrun関数を直接定義します。

ビジネスロジックを実装してください。

コラム

サーバサイドJavaScriptでビジネスロジックを実装する場合、終了通知がされてもJSファイルではそれを検知することができません。 終了通知に依存しない方法でビジネスロジックが確実に終了するように実装してください。

詳細については非同期処理機能の仕様書を参照してください。

終了通知

終了通知
開発モデル メソッド/関数 備考
Java release AbstractTask抽象クラスではこのメソッドは何もしません。
サーバサイドJavaScript (該当する関数はありません)

releaseメソッドの実装例として、releaseメソッドの実装も参照してください。

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、releaseメソッドを独自に開発する必要があります。 実行中のビジネスロジックを外部から停止させることがある場合はこのメソッドをオーバーライドしてください。

ビジネスロジックを停止させる一般的な方法はなく、実装方法は開発者に委ねられます。

注意

このメソッドをオーバーライドする場合、上記の点に注意して実装してください。

  • このメソッドは他のメソッドとは異なるスレッド上から呼び出される場合があります。
  • 非同期処理機能から異なるスレッド上で複数回呼び出される場合もあります。

注意

ビジネスロジック停止時にタスクの再登録やタスクキューの停止を行いたい場合、このメソッドを直接呼び出さないでください。 タスクの再登録やタスクキューの停止はJavaのTaskManager#releaseRunningXXXXTaskメソッドまたはサーバサイドJavaScriptのWorkManager#releaseRunningXXXXTask関数で行います。 この時、 TaskManagerWorkManager は、内部で release を呼び出しています。

このメソッドを直接呼び出してrunメソッドが終了された場合、非同期処理機能からは通常の終了と区別がつかず、タスクの再登録やタスクキューの停止等が行われません。 詳細については非同期処理機能の仕様書を参照してください。

サーバサイドJavaScript

サーバサイドJavaScriptでは終了通知に該当する関数は存在しません。

注意

サーバサイドJavaScriptで実装されたタスクに対してJavaのTaskManager#releaseXXXXTaskメソッドやサーバサイドJavaScriptのWorkManager#releaseXXXXTask関数を呼び出した場合、該当するタスクは非同期処理機能の管理対象外ですが、ビジネスロジックは停止しません。 そのため、タスクメッセージの再登録を行ったりすると、同一のタスクメッセージから生成された複数のタスクのインスタンスのビジネスロジックが同時に重複して実行される場合があります。

処理実行可能状態の通知

タスクが処理実行可能状態になった時に通知されます。

処理実行可能状態の通知
開発モデル メソッド/関数 備考
Java taskAccepted AbstractTask抽象クラスではこのメソッドは何もしません。
サーバサイドJavaScript taskAccepted

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、taskAcceptedメソッドを独自に開発する必要があります。 タスクが処理実行可能状態になった時に何らかの処理を行いたい場合、このメソッドをオーバーライドしてください。

サーバサイドJavaScript

JSファイル上でタスクを開発する場合、必要に応じてtaskAccepted関数を独自に開発する必要があります。 タスクが処理実行可能状態になった時に何らかの処理を行いたい場合、この関数を定義してください。

処理実行中状態の通知

タスクが処理実行中状態になった時に通知されます。

処理実行中状態の通知
開発モデル メソッド/関数 備考
Java taskStarted AbstractTask抽象クラスではこのメソッドは何もしません。
サーバサイドJavaScript taskStarted

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、taskStartedメソッドを独自に開発する必要があります。 タスクが処理実行中状態になった時に何らかの処理を行いたい場合、このメソッドをオーバーライドしてください。

サーバサイドJavaScript

JSファイル上でタスクを開発する場合、必要に応じてtaskStarted関数を独自に開発する必要があります。 タスクが処理実行中状態になった時に何らかの処理を行いたい場合、この関数を定義してください。

ビジネスロジック終了の通知

タスクのビジネスロジックが終了した時(通常はrunメソッドやrun関数が終了した時)に通知されます。

処理実行中状態の通知
開発モデル メソッド/関数 備考
Java taskCompleted AbstractTask抽象クラスではこのメソッドは何もしません。
サーバサイドJavaScript taskCompleted

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、taskCompletedメソッドを独自に開発する必要があります。 タスクのビジネスロジックが正常に終了した時に何らかの処理を行いたい場合、このメソッドをオーバーライドしてください。

サーバサイドJavaScript

JSファイル上でタスクを開発する場合、必要に応じてtaskCompleted関数を独自に開発する必要があります。 タスクのビジネスロジックが正常に終了した時に何らかの処理を行いたい場合、この関数を定義してください。

タスクの受付拒否

タスクの受付が拒否された時に通知されます。

タスクの受付拒否
開発モデル メソッド/関数 備考
Java taskRejected AbstractTask抽象クラスではこのメソッドは何もしません。
サーバサイドJavaScript taskRejected

Java

jp.co.intra_mart.foundation.asynchronous.Taskインタフェースを直接実装してタスクを開発する場合、taskRejectedメソッドを独自に開発する必要があります。 タスクメッセージが何らかの理由で拒否されて実行されなかった時に何らかの処理を行いたい場合、このメソッドをオーバーライドしてください。

サーバサイドJavaScript

JSファイル上でタスクを開発する場合、必要に応じてtaskRejected関数を独自に開発する必要があります。 タスクメッセージが何らかの理由で拒否されて実行されなかった時に何らかの処理を行いたい場合、この関数を定義してください。

タスクを実装する時の注意点

タスクの非同期処理機能による実行は、アプリケーション内からのメソッド呼び出しとは実行場所やタイミングが異なるため、いくつか注意する点があります。

  • 遅延実行

    タスクは即時に実行されず、一度タスクキューに登録された後に実行されます。

  • 非同期による実行

    タスクは非同期で処理が行われます。

  • リモートによる実行

    タスクはリモートで処理が行われる場合があります。

  • 終了通知(Javaで開発する場合)

    任意の時点で終了通知が行われる場合があります。 この場合、タスクメッセージがタスクキューの先頭に再登録される場合があります。

  • 情報の受け渡し

  • run 以外のメソッドまたは関数では、以下の点について注意してください。

    • Java EE に関連するリソース(データベースへのアクセス、JNDIによる検索等)は取得しないでください。
    • コンテキストは取得しないでください。

この他にもパラメータの受け渡しについて考慮する必要があります。パラメータの受け渡しの詳細についてはパラメータで説明します。

遅延実行

非同期処理機能では直接タスクを実行せず、一度タスクメッセージがタスクキューに登録されます。

非同期処理機能は登録されたタスクメッセージのビジネスロジックをできるだけ早く開始しようとしますが、実際に開始される時間については制限が設けられていません。

そのため、タスクがいつ開始されても問題ないような実装をしてください。

非同期による実行

非同期処理機能ではタスクメッセージを登録したスレッドとタスクを実行するスレッドは異なります。 そのため、登録時のスレッドに関連付けられた情報は実行時に引き継がれません。

スレッドに関連付けられた情報としては以下のようなものがあります。

  • (開発言語としてJavaを利用している場合)java.lang.ThreadLocalに保存された値

    登録時にjava.lang.ThreadLocalを利用して何らかの値を登録しても、実行時には取得できません。

  • 登録時のスレッドに関連付けられているトランザクション

    登録時にトランザクションを開始してもタスクにはそのトランザクションは引き継がれないため、登録時と実行時のSQLと同一のトランザクション上で動作させることは出来ません。

タスクメッセージの登録時に何らかの情報を保存し、タスク実行時にそれらの情報を利用したい場合は情報の受け渡しを参照してください。

リモートによる実行

非同期処理機能ではタスクメッセージを登録した時のサーバとタスクを実行するサーバは異なる場合があります。 そのため、登録時のローカル環境に保存された情報は実行時に引き継がれません。

ローカル環境に保存された情報としては以下のようなものがあります。

  • (開発言語としてJavaを利用している場合)クラス変数に保存された値

    登録時のサーバと実行時のサーバが異なる(つまり、両者のVirtual Machineが異なる)場合があるため、登録時にクラス変数に何らかの値を登録しても、実行時には取得できない場合があります。

    この現象を確実に回避する方法はありません。

  • ローカルのファイルに保存された値

    登録時のサーバと実行時のサーバが異なる場合があるため、 ローカルのファイルで値を受け渡すことはできません。

タスクメッセージの登録時に何らかの情報を保存し、タスク実行時にそれらの情報を利用したい場合は情報の受け渡しを参照してください。

サンプル

単純なサンプル

Taskの実装のサンプルを以下に示します。

package sample;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import jp.co.intra_mart.foundation.asynchronous.AbstractTask;
import jp.co.intra_mart.foundation.asynchronous.TaskEvent;

public class SampleTask extends AbstractTask {

   private volatile boolean isActive = true;

   private String label;

   @Override
   public void run() {
      // getParameter を利用して設定済みのパラメータを取得
      Map<String, ?> parameter = getParameter();
      String label = (String) parameter.get("label");
      Number countObj = (Number) parameter.get("count");
      int count = countObj.intValue();

      // リソースの取得
      ...

      try {
         int index = 0;
         while (index < count && this.isActive) { // release()が呼ばれている場合は途中でも終了
            // ビジネスロジック
            businessLogic(label, index);
            index++;
         }
      } catch(MyException e) { // 独自の例外が発生する場合がある
         // 例外発生時の処理
         ...

      } finally {
         // リソースの解放
         ...

      }
   }

   private void businessLogic(String label, int index) throws MyException {
      // ビジネスロジック
      ...

   }

   @Override
   public void release() {
      // ビジネスロジックを途中で終了するためのフラグ
      this.isActive = false;
   }

   @Override
   public void taskAccepted(TaskEvent event) {
      // タスク受付時の処理
      ...

   }

   @Override
   public void taskStarted(TaskEvent event) {
      // タスクが開始された時の処理
      ...

   }

   @Override
   public void taskCompleted(TaskEvent event) {
      final Exception exception = event.getException();
      if (exception == null) {
         // ビジネスロジックが問題なく完了した場合の後処理
         ...

      } else {
         // ビジネスロジック実行時に問題が発生した時の後処理
         ...

      }
   }

   @Override
   public void taskRejected(TaskEvent event) {
      final Exception exception = event.getException();
      if (exception == null) {
         // 例外発生以外の理由によって受付が拒否された場合の処理
         ...

      } else {
         // 例外発生によって受付が拒否された場合の処理
         ...

      }
   }
}

このサンプル には以下のような特徴があります。

  • AbstractTask 抽象クラスを拡張して Task インタフェースを実装している
  • 実際のビジネスロジック(businessLogic)では例外(MyException)が発生しているが、 run メソッド内ではthrowしない

コラム

このサンプルでは、終了通知をした場合releaseメソッドを終了した後でもしばらくrunメソッドが実行されたままである可能性があります。

終了通知を指示する時にタスクメッセージを再登録するよう指定していた場合、同じタスクが同時に動作する可能性があるので注意してください。

終了通知(Javaで開発する場合)

Javaでタスクを実装した場合、ビジネスロジック(runメソッド)が実行された後で、その処理が完了する前であっても、終了通知によってその処理を途中でも終了するように通知されることがあります。

タスクの実装者は終了通知に関連して以下のことを考慮する必要があります。

releaseメソッドの実装

タスクは、runメソッドの実行時にタスク管理アプリケーションからreleaseメソッドを呼び出されることがあります。 タスクの開発者は、releaseメソッドが呼び出されたら、実行中のrunメソッドをできるだけ速やかに終了させるような実装をしてください。

releaseの実装例releaseメソッドの実装例を示します。

releaseの実装例
package sample.mytest;

import java.util.List;
import jp.co.intra_mart.foundation.asynchronous.AbstractTask;

public class MyTask extends AbstractTask {

   private volatile boolean releaseNotified;

   public MyTask() {
      this.releaseNotified = false;
   }

   @Override
   public void run() {
      // 処理用データの取得
      List<String> list = ...;

      for (String item : list) {
         // itemに対する処理
         doSomething(item);

         // releaseが呼ばれていた場合は途中でも終了
         if (this.releaseNotified) {
            break;
         }
      }
   }

   @Override
   public void release() {
      this.releaseNotified = true;
   }

   private void doSomething(String item) {
      ...
   }
}

runメソッドではループ処理を行い、すべてのデータ(item)に対して処理を行うとこのメソッドを終了します。

releaseの実装例では、releaseメソッドが呼ばれるとreleaseNotifiedtrueに設定されます。

runメソッドではreleaseNotifiedを常に観察し、この値がtrueになれば途中でもループから抜けてrunを終了します。

注意

非同期処理機能はreleaseメソッドの呼び出しが完了すると即座に同タスクを管理対象外とします。

一方、タスクのrunメソッドはreleaseメソッドが呼び出されても即座に終了するとは限りません。

これはタスクメッセージを再登録する場合に問題になることがあります。 詳細は再登録時の制御を参照してください。

再登録時の制御

TaskManager#releaseRunningXXXXTaskメソッド(Java)、またはWorkManager#releaseRunningXXXXTask関数(サーバサイドJavaScript)を呼び出すとき、終了通知をしたタスクをタスクキューに再登録するかどうかを第2引数で指定することができます。

タスクメッセージを再登録すると、タイミングによってはタスクのrunが同時に重複して実行される可能性があります。 このケースに該当するシナリオをreleaseの実装例を実行している状況で考えます。 再登録時に重複して実行される場合も参照してください。

  1. releaseの実装例doSomethingで長時間処理をしている間にタスク管理アプリケーションから終了通知を行うよう要求します。この時、タスクメッセージを再登録するものとします。

  2. 該当するタスクのreleaseが呼び出され、releaseNotifiedtrueに設定されます。

    この時、doSomethingが実行中であるため、runメソッドがまだ完了していないものとします。 つまり、一時的に「releaseメソッドの呼び出しが完了してタスクが管理対象外となったが、runメソッドの実行が終了していない」という状態です。

  3. releaseメソッドの呼び出しが完了すると、非同期処理機能は即座に同タスクを管理対象外とします。

    ここでは単に「管理対象外」となるだけであり、runメソッドがまだ完了していないものとします。

  4. タスク管理アプリケーションはreleaseの呼び出しが完了するとタスクメッセージをタスクキューの先頭に再登録します。

    この時点でも先に実行中であったタスクのrunメソッドがまだ完了していないものとします。

  5. タスク実行エンジンがタスクキューの先頭をチェックし、タスクメッセージを取得し、タスクを生成します。

    この場合のタスクは、現在実行中のタスクとは別のインスタンスです。

  6. タスク実行エンジンは新しく生成されたタスクのrunメソッドを非同期で実行開始します。

    この時点で、同じタスクメッセージから生成された2つの異なるタスクのrunメソッドが重複して実行されます。

Reentry sample (no wait)

再登録時に重複して実行される場合

ビジネスロジック(タスクのrunメソッド)が重複して実行されると問題がある場合、上記の点を考慮する必要があります。 この現象を回避する方法としては以下のようなことが考えられます。

  • 終了通知 を行うとき、タスクメッセージの再登録をしない

    タスクキューの状態はTaskManager#releaseRunningXXXXTaskメソッド(Java)、またはWorkManager#releaseRunningXXXXTask関数(サーバサイドJavaScript)を呼び出すときの第2引数を必ずfalseで呼び出すことが前提です。

  • タスクキューを無効状態にする

    タスクキューが無効状態である場合、タスクキューにタスクメッセージが再登録されてもタスク実行エンジンによる実行対象とはなりません。

    タスクキューの状態はTaskManager#releaseRunningXXXXTaskメソッド(Java)、またはWorkManager#releaseRunningXXXXTask関数(サーバサイドJavaScript)を呼び出すときの第3引数で制御できます。 詳細は非同期処理機能の仕様書を参照してください。

  • タスクのreleaseメソッドの実行終了をrunメソッドが終了するまでブロックする

    releaseの内部ではrunメソッドをできるだけ早く終了するような処理を行うだけでなく、実際にrunメソッドが完了するまで待機します。

最後の方法を採用する場合の例をreleaseの実装例2に示します。 release時にビジネスロジックが終了準備ができるまで待機する場合も参照してください。

releaseの実装例2
package sample.mytest;

import java.util.List;
import jp.co.intra_mart.foundation.asynchronous.AbstractTask;

public class MyTask2 extends AbstractTask {

   private volatile boolean releaseNotified;

   private volatile boolean finished;

   public MyTask2() {
      this.releaseNotified = false;
      this.finished = false;
   }

   @Override
   public void run() {
      try {
         // 処理用データの取得
         List<String> list = ...;

         for (String item : list) {
            // itemに対する処理
            doSomething(item);

            // releaseが呼ばれていた場合は途中でも終了
            if (this.releaseNotified) {
               break;
            }
         }
      } finally {
         this.finished = true;
      }
   }

   @Override
   public void release() {
      this.releaseNotified = true;
      while (!this.finished) {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            //
         }
      }
   }

   private void doSomething(String item) {
      ...
   }
}

releaseの実装例2では、runメソッドの終了時に必ずインスタンス変数finishedtrueにしています。 releaseメソッドではこの値を1秒ごとに監視し、finishedtrueになるまで待機します。 finishedtrueになった場合はrunメソッド内でビジネスロジックが完了したことが確認できたと判断し、releaseメソッドも終了します。

Reentry sample (no wait)

release時にビジネスロジックが終了準備ができるまで待機する場合

コラム

releaseの実装例2ではThread#sleepを使用して待機していますが、実際にはパフォーマンス等を考慮し、java.lang.Object#waitjava.lang.Object#notifyによるロックや、java.util.concurrent.locksパッケージのロック用ユーティリティ等を利用する方法なども考えられます。

コラム

厳密には、release時にビジネスロジックが終了準備ができるまで待機する場合の場合であってもtask01task02runメソッドが同時に実行されている可能性もあります。 しかしながら、task01のビジネスロジックが完了(ここではdoSomethingメソッドがもはや呼び出されない状態)した後にtask02のインスタンスが生成されているので、ビジネスロジックそのものは競合しません。

単一実行の強制(直列タスクキュー)

再登録時の制御でも述べたように、非同期処理機能はreleaseメソッドの呼び出しが完了すると即座に同タスクを管理対象外とします。 この場合、ビジネスロジックの実行状況については無視されるため、タスクメッセージの再登録を指定していない場合は後続のタスクが一時的に同時に実行される場合があります。

タスクメッセージを直列タスクキューに登録している状況でこのような状況を防ぎたい場合は再登録時の制御で述べた内容と同様にreleaseでビジネスロジックの完了までブロックするなどの工夫が必要です。

情報の受け渡し

非同期処理機能ではタスクメッセージの登録時とタスクの実行時では環境が以下のように異なっています。

  • タスク登録アプリケーションはタスクメッセージをタスクキューに登録するが、ビジネスロジックを実行するタスクは直接生成されない(遅延実行を参照)
  • スレッドが異なる(非同期による実行を参照)
  • 実行するサーバが異なる(リモートによる実行を参照)

そのため、通常のメソッド(または関数)呼び出しとは異なり、呼び出し元から呼び出し先へ直接変数情報を渡すことができません。

登録時に決定される情報をタスクに渡す方法はいくつかあります。 以下に代表的なものを記します。

パラメータによる情報の受け渡し

タスクメッセージの登録時に、開発者が定義したパラメータを渡すことができます。

設定できるパラメータの形式には制限があるため、登録時と実行時ではそれぞれ変換が必要となる場合があります。

パラメータを利用するタスクの例およびパラメータを利用するタスクメッセージ登録の例にパラメータを利用して情報を受け渡す例を示します。

パラメータの詳細についてはパラメータを参照してください。

パラメータを利用するタスクの例
package sample.mytest;

import java.util.Map;

import jp.co.intra_mart.foundation.asynchronous.AbstractTask;

public class ParameterTask extends AbstractTask {

   @Override
   public void run() {
      Map<String, ?> parameter = getParameter();
      String name = (String) parameter.get("NAME");
      int age = ((Number) parameter.get("AGE")).intValue();
      businessLogic(name, age);
   }

   private void businessLogic(String name, int age) {
      ...
   }
}
パラメータを利用するタスクメッセージ登録の例
package sample.mytest;

import java.util.HashMap;
import java.util.Map;

import jp.co.intra_mart.foundation.asynchronous.TaskControlException;
import jp.co.intra_mart.foundation.asynchronous.TaskManager;

public class ParameterTaskRegister {
   ...

   private void addTask() {
      Map<String, Object> parameter = new HashMap<String, Object>();
      parameter.put("NAME", "intra-mart");
      parameter.put("AGE", 26);
      try {
         TaskManager.addParallelizedTask("sample.mytest.ParameterTask", parameter);
      } catch (TaskControlException e) {
         ...
      }
   }
   ...
}

パラメータを利用するタスクの例においてParameterTaskクラスはAbstractTask抽象クラスを継承しています。 そのため、非同期処理機能からsetParameterが呼ばれています。 この状態であれば、getParameterメソッドによってパラメータを利用するタスクメッセージ登録の例で登録された値を取得することができます。

注意

パラメータを利用するタスクの例パラメータを利用するタスクメッセージ登録の例で数値型を受け渡していますが、登録時と取得時の方法が異なることに注意してください。 詳細については非同期処理機能の仕様書、またはパラメータを確認してください。

コンテキストによる情報の受け渡し

タスクメッセージの登録時には、その時点におけるコンテキストの情報も同時に保存されます。 保存されたコンテキストはrunメソッド(Java使用時)またはrun関数(サーバサイドJavaScript使用時)に取得および利用することができます。

コンテキストの詳細については関連する仕様書やAPIの説明を参照してください。

コンテキストを利用するタスクの例およびコンテキストを利用するタスクメッセージ登録の例にコンテキストを利用して情報を受け渡す例を示します。

コンテキストを利用するタスクの例
package sample.mytest;

import jp.co.intra_mart.foundation.asynchronous.AbstractTask;
import jp.co.intra_mart.foundation.context.Contexts;
import jp.co.intra_mart.foundation.context.model.AccountContext;
import jp.co.intra_mart.foundation.context.model.UserType;

public class ContextTask extends AbstractTask {

   @Override
   public void run() {
      // 登録時の Context を利用可能
      AccountContext accountContext = Contexts.get(AccountContext.class);
      UserType userType = accountContext.getUserType();
      String userCd = accountContext.getUserCd();
      businessLogic(userCd);
   }

   private void businessLogic(String userCd) {
      ...
   }
}
コンテキストを利用するタスクメッセージ登録の例
package sample.mytest;

import jp.co.intra_mart.foundation.asynchronous.TaskControlException;
import jp.co.intra_mart.foundation.asynchronous.TaskManager;
import jp.co.intra_mart.foundation.context.Contexts;
import jp.co.intra_mart.foundation.context.model.AccountContext;
import jp.co.intra_mart.foundation.context.model.UserType;

public class ContextTaskRegister {

   private void addTask() {
      // Context の情報を明示的に取得や指定しなくても、
      // 現在の Context が自動的にタスクキューに登録される

      // AccountContext accountContext = Contexts.get(AccountContext.class);
      // UserType userType = accountContext.getUserType();
      // String userCd = accountContext.getUserCd();

      try {
         TaskManager.addParallelizedTask("sample.mytest.ContextTask", null);
      } catch (TaskControlException e) {
         ...
      }
   }
}

コンテキストを利用するタスクメッセージ登録の例を実行すると、現在のコンテキストも含んだ状態でタスクメッセージを登録します。

そのため、コンテキストはタスクメッセージ登録時とタスク実行時では同じ物が取得できます。

コンテキストを利用するタスクの例ではAccountContext#getUserCdによってユーザコードを、AccountContext#getUserTypeによってユーザタイプを取得しています。 この場合、コンテキストを利用するタスクメッセージ登録の例でタスクメッセージを登録した時点の値が反映されています。

データベースによる情報の受け渡し

非同期処理機能からタスクのrunメソッドが呼ばれた場合、runメソッド内部ではintra-mart Accel Platformで扱うデータベースにアクセスすることが可能です。 (詳細は非同期処理機能の仕様書を参照してください。)

データベースを利用するタスクの例およびデータベースを利用するタスクメッセージ登録の例にデータベースを利用して情報を受け渡す例を示します。

このサンプルでは、あらかじめテーブル構造に示す構造を持つテーブルがテナントデータベースに登録されているものとします。

テーブル構造
項目 プライマリキー 説明
ID 整数 ID
NAME 文字列型 名前
AGE 整数 年齢
データベースを利用するタスクの例
package sample.mytest;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import jp.co.intra_mart.foundation.asynchronous.AbstractTask;
import jp.co.intra_mart.foundation.database.DBTransaction;
import jp.co.intra_mart.foundation.database.DatabaseUtil;
import jp.co.intra_mart.foundation.database.TenantDatabase;
import jp.co.intra_mart.foundation.database.exception.DatabaseException;

public class DatabaseTask extends AbstractTask {

   @Override
   public void run() {
      DBTransaction transaction = new DBTransaction();
      boolean success = false;
      try {
         transaction.beginTransaction();
         try {
            TenantDatabase database = new TenantDatabase();
            Connection connection = database.getConnection();
            try {
               String sql = "select name, age from my_test";
               PreparedStatement statement = connection.prepareStatement(sql);
               try {
                  ResultSet resultSet = statement.executeQuery();
                  try {
                     while (resultSet.next()) {
                        String name = resultSet.getString("name");
                        int age = resultSet.getInt("age");
                        businessLogic(name, age);
                     }
                  } finally {
                     DatabaseUtil.closeResultSetQuietly(resultSet);
                  }
               } finally {
                  DatabaseUtil.closeStatementQuietly(statement);
               }
            } finally {
               DatabaseUtil.closeConnectionQuietly(connection);
            }
            transaction.commit();
            success = true;
         } finally {
            if (!success) {
               try {
                  transaction.rollback();
               } catch (DatabaseException e) {
                  ...
               }
            }
         }
      } catch (DatabaseException e) {
         ...
      } catch (SQLException e) {
         ...
      }
   }

   private void businessLogic(String name, int age) {
      // ...
   }
}
データベースを利用するタスクメッセージ登録の例
package sample.mytest;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import jp.co.intra_mart.foundation.asynchronous.TaskControlException;
import jp.co.intra_mart.foundation.asynchronous.TaskManager;
import jp.co.intra_mart.foundation.database.DBTransaction;
import jp.co.intra_mart.foundation.database.DatabaseUtil;
import jp.co.intra_mart.foundation.database.TenantDatabase;
import jp.co.intra_mart.foundation.database.exception.DatabaseException;

public class DatabaseTaskRegister {
   ...

   private void addTask() {
      DBTransaction transaction = new DBTransaction();
      boolean success = false;
      try {
         transaction.beginTransaction();
         try {
            TenantDatabase database = new TenantDatabase();
            Connection connection = database.getConnection();
            try {
               for (int index = 0; index < 5; index++) {
                  String sql = "insert into my_test (id, name, age) values (?, ?, ?)";
                  PreparedStatement statementInsert = connection.prepareStatement(sql);
                  try {
                     statementInsert.setInt(1, index);
                     statementInsert.setString(2, "intra-mart " + index);
                     statementInsert.setInt(3, 26 + index);
                     statementInsert.execute();
                  } finally {
                     try {
                        statementInsert.close();
                     } catch (SQLException e) {
                        ...
                     }
                  }
               }
            } finally {
               DatabaseUtil.closeConnectionQuietly(connection);
            }
            transaction.commit();
            success = true;
         } finally {
            if (!success) {
               try {
                  transaction.rollback();
               } catch (DatabaseException e) {
                  ...
               }
            }
         }
      } catch (DatabaseException e) {
         ...
      } catch (SQLException e) {
         ...
      }

      try {
         TaskManager.addParallelizedTask("sample.mytest.DatabaseTask", null);
      } catch (TaskControlException e) {
         ...
      }
   }
   ...
}

ストレージサービスによる情報の受け渡し

非同期処理機能からタスクのrunメソッドが呼ばれた場合、runメソッド内部ではストレージサービスを利用することが可能です。 (詳細は非同期処理機能の仕様書を参照してください。)

ストレージサービスを利用するタスクの例およびストレージサービスを利用するタスクメッセージ登録の例にストレージサービスを利用して情報を受け渡す例を示します。

このサンプルでは、あらかじめ%PUBLIC_STORAGE_PATH%/task/sampleディレクトリが存在しているものとします。

ストレージサービスを利用するタスクの例
package sample.mytest;

import java.io.IOException;
import java.nio.charset.Charset;

import jp.co.intra_mart.foundation.asynchronous.AbstractTask;
import jp.co.intra_mart.foundation.service.client.file.PublicStorage;

public class StorageTask extends AbstractTask {

   @Override
   public void run() {
      PublicStorage storage = new PublicStorage("/task/sample/test.txt");
      String text = null;
      try {
         text = storage.read(Charset.forName("UTF-8"));
      } catch (IOException e) {
         ...
      }
      businessLogic(text);
   }

   private void businessLogic(String text) {
      ...
   }
}
ストレージサービスを利用するタスクメッセージ登録の例
package sample.mytest;

import java.io.IOException;
import java.nio.charset.Charset;

import jp.co.intra_mart.foundation.asynchronous.TaskControlException;
import jp.co.intra_mart.foundation.asynchronous.TaskManager;
import jp.co.intra_mart.foundation.service.client.file.PublicStorage;

public class StorageTaskRegister {
   ...

   private void addTask() {
      String content = "name:intra-mart,age:26";
      PublicStorage storage = new PublicStorage("/task/sample/test.txt");

      try {
         storage.write(content, Charset.forName("UTF-8"));
      } catch (IOException e) {
         ...
      }

      try {
         TaskManager.addParallelizedTask("sample.mytest.StorageTask", null);
      } catch (TaskControlException e) {
         ...
      }
   }
   ...
}