Asakusa DSLとThunderGateの連携¶
この文書では、ThunderGateと連携してデータベースを操作するバッチアプリケーションを Asakusa DSLで記述する方法について紹介します。
ThunderGateについての情報は、 ThunderGateユーザーガイド を参照して下さい。
基本的な連携の方法¶
Asakusa DSLで定義するジョブフローは、 ThunderGateと連携することで入出力にデータベースのテーブルを利用できます。
Note
単一のジョブフローの中で、インポートとエクスポートの対象にできるデータベースは1つまでです。 ただし、 補助インポータ の機能を利用すると、制限つきで複数のデータベースからインポートできます。
データベースのテーブルからインポートする¶
ThunderGateと連携してデータベースのテーブルからデータをインポートする場合、
DbImporterDescription [1] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
- String getTargetName()
- インポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 インポータは実行時に $ASAKUSA_HOME/bulkloader/conf 配下に配置した[ターゲット名]-jdbc.properties に記述されたデータベース接続情報定義ファイルを使用してデータベースに対するアクセスを行います。 このファイルの内容については ThunderGateユーザーガイド を参照して下さい。
- Class<?> getModelType()
- インポータが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。 インポータは実行時にデータモデルクラスを作成する元となったテーブル名に対してインポート処理を行います [2] 。
- LockType getLockType()
- インポータの処理時に行われるロックの種類を戻り値に指定します。
指定可能なロック種別については BulkLoadImporterDescription.LockTypeのAPIリファレンスを参照して下さい [3] 。
- String getWhere()
- インポータが利用する抽出条件をSQLの条件式で指定します。 指定する文字列はMySQL形式の - WHERE以降の文字列である必要があります。- ここには - ${変数名}の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。 詳しくは Asakusa DSLユーザーガイド を参照してください。
- DataSize getDataSize()
- インポートするデータのおおよそのサイズを指定します。 コンパイラはこのデータサイズをヒントに実行計画を作成します。 省略された場合にはデータサイズが不明であるという前提で実行計画を作成します。
以下の例では、テーブルから生成したデータモデルクラス Hoge に対応するテーブルの全データをロードして、その際にテーブルロックを取得します。
public class HogeFromDb extends DbImporterDescription {
    public String getTargetName() {
        return "asakusa";
    }
    public Class<?> getModelType() {
        return Hoge.class;
    }
    public LockType getLockType() {
        return LockType.TABLE;
    }
}
以下の例では、バッチ引数で指定した parameter 変数を利用して条件式を指定しています。
public class HogeFromDb extends DbImporterDescription {
    public String getTargetName() {
        return "asakusa";
    }
    public Class<?> getModelType() {
        return Hoge.class;
    }
    public String getWhere() {
        return "NAME = '${parameter}'";
    }
    public LockType getLockType() {
        return LockType.ROW;
    }
}
| [1] | com.asakusafw.vocabulary.bulkloader.DbImporterDescription | 
| [2] | DMDLを直接記述してデータモデルクラスを作成している場合、 DbImporterDescriptionの代わりにBulkLoadImporterDescription[4] を利用して下さい | 
| [3] | com.asakusafw.vocabulary.bulkloader.BulkLoadImporterDescription.LockType | 
| [4] | com.asakusafw.vocabulary.bulkloader.BulkLoadImporterDescription | 
データベースのテーブルにエクスポートする¶
ThunderGateと連携してジョブフローの処理結果をデータベースのテーブルに書き出すには、
DbExporterDescription [5] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
- String getTargetName()
- エクスポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。
利用方法はインポータの getTargetName()と同様です。
- Class<?> getModelType()
- エクスポータが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。 エクスポータは実行時にデータモデルクラスを作成する元となったテーブル名に対してエクスポート処理を行います [6] 。
以下の例では、テーブルから生成したデータモデルクラス Hoge に対応するテーブルに対して、ジョブフローの処理結果を書き戻します。
public class HogeIntoDb extends DbExporterDescription {
    public Class<?> getModelType() {
        return Hoge.class;
    }
}
| [5] | com.asakusafw.vocabulary.bulkloader.DbExporterDescription | 
| [6] | DMDLを直接記述してデータモデルクラスを作成している場合、 DbExporterDescriptionの代わりにBulkLoadExporterDescription[7] を利用して下さい | 
| [7] | com.asakusafw.vocabulary.bulkloader.BulkLoadExporterDescription | 
補助インポータ¶
補助インポータは、1つのジョブフロー中に通常のインポートやエクスポート処理を行うデータベースとは別の、 データベースからデータをインポートする際に使用するインポータです。
通常のインポータはデータの更新を前提としてロック取得 (排他制御) の指定を行いますが、 補助インポータは指定したテーブルに対してデータを参照のみを行います。 つまり、補助インポータを利用すると、「他のデータベースからマスタデータなどの参照データを読み出せる」ということになります [8] 。
補助インポータを使用してインポート処理を行うには、
SecondaryImporterDescription [9] を継承したクラス(インポート処理記述クラス)を作成し、必要なメソッドをオーバーライドします。
同クラスに指定するメソッドを以下に示します。
- String getTargetName()
- 補助インポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 通常のインポータとは異なるターゲット名を指定します。補助インポータ実行時にはターゲット名に対応するデータベース接続情報定義ファイルを配置しておく必要があります。 データベース接続情報定義ファイルの定義方法は通常のインポータと同様です。
- Class<?> getModelType()
- 補助インポータが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。 利用方法は通常のインポータと同様です。
- String getWhere()
- 補助インポータが利用する抽出条件をSQLの条件式で指定します。 利用方法は通常のインポータと同様です。
- DataSize getDataSize()
- このインポータが取り込むデータサイズの分類を指定します。 利用方法は通常のインポータと同様です。
以下の例では、テーブルから生成したデータモデルクラス Foo に対応するテーブルの全データをロードします。
また、その時に利用するデータベースは other というターゲット名で指定しています。
以下の例では、テーブルから生成したデータモデルクラス Hoge に対応するテーブルに対して、ジョブフローの処理結果を書き戻します。
/**
 * 補助インポータの動作を定義する。
 */
public class SecondaryImporterExample extends SecondaryImporterDescription {
    @Override
    public String getTargetName() {
        return "other";
    }
    @Override
    public Class<?> getModelType() {
        return Foo.class;
    }
    // 補助インポータはgetLockType()をオーバーライドできない。
}
以下は補助インポータを利用する場合の注意点です。
- 補助インポータでないインポータのターゲットは、ジョブフロー中で1種類までです- DbImporterDescriptionを使う場合、- getTargetName()はジョブフロー中で全て同じものにしてください
- SecondaryImporterDescriptionが、- DbImporterDescriptionと同じターゲット名を指定することは可能です
 
- エクスポータのターゲットは、通常のインポータと同じターゲットにしてください- 通常のインポータでターゲットAを指定し、エクスポータと補助インポータにターゲットBを指定、のようなことはできません
- 通常のインポータを一つも利用しない場合、エクスポータのターゲット名は何を指定してもかまいません
 
| [8] | これとは逆の「補助エクスポータ」のような仕組みは現在提供していません | 
| [9] | com.asakusafw.vocabulary.bulkloader.SecondaryImporterDescription | 
重複チェック機能¶
エクスポータの拡張機能で、新しいレコードをテーブルに追加する際に、特定のカラムが同じデータが既にデータベース上にあるかどうかをチェックできます。 重複データがデータベース上に既に存在する場合には、そのデータを通常のテーブルには追加せずに、かわりにエラー情報のテーブルに追加します。 この機能は既存のレコードに対しては利用できず、 新しいレコードを追加する際にだけ利用できます 。
重複チェックを行う場合、まずは次のようなテーブルが必要です。
- 正常レコードを登録するテーブル- 重複チェック用のカラムがテーブルに存在すること
 
- 重複したレコードを登録するエラーテーブル- エラーコードを格納するカラム(CHAR/VARCHAR型)がテーブルに存在すること
 
この機能の利用方法を、2つのケースに分けて説明します。
正常テーブルよりもエラーテーブルの情報が少ない場合¶
正常テーブルよりもエラーテーブルの情報が少ない場合に、重複チェックを行う方法を紹介します。 このとき、正常テーブルとエラーテーブルは次のような関係であるとします。
- 正常テーブル- 必要な業務情報やシステムカラムを含んでいる
- 重複チェック用のカラムを含んでいる
 
- エラーテーブル- 正常テーブルの一部または全部のカラムが、同じ名前で存在する
- さらに、エラーコードを格納するカラムが存在する (正常テーブルに含まれていなくてよい)
 
つまり、正常テーブルにない情報をエラーテーブルに設定したい場合 [10] には、この方法は利用できません。 この場合には 正常テーブルとエラーテーブルの構造が大きく異なる場合 を参照して下さい。
重複チェックを行うには DbExporterDescription の代わりに DupCheckDbExporterDescription [11] を継承したエクスポータ記述を作成します。
/**
 * 重複チェックつきエクスポータの動作を定義する (正常テーブル中心)。
 */
public class DupCheckExporterExample1 extends DupCheckDbExporterDescription {
    @Override
    public String getTargetName() {
        return "asakusa";
    }
    @Override
    public Class<?> getModelType() {
        return Hoge.class;
    }
    @Override
    protected Class<?> getNormalModelType() {
        return Hoge.class;
    }
    @Override
    protected Class<?> getErrorModelType() {
        return HogeError.class;
    }
    @Override
    protected List<String> getCheckColumnNames() {
        return Arrays.asList("VALUE");
    }
    @Override
    protected String getErrorCodeColumnName() {
        return "ERR_CODE";
    }
    @Override
    protected String getErrorCodeValue() {
        return "999";
    }
}
それぞれのオーバーライドしたメソッドでは、以下のように設定します。
- getTargetName()
- エクスポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 利用方法は通常のエクスポータやインポータと同様です。
- getModelType()
- 正常テーブルのテーブルモデルクラスを返します。
- getNormalModelType()
- 正常テーブルのテーブルモデルクラスを返します。
- getErrorModelType()
- エラーテーブルのテーブルモデルクラスを返します。
- getCheckColumnNames()
- 重複チェックを行うカラム名の一覧を返します。 この値は、正常テーブルのテーブルモデルに存在するカラムを指定する必要があります。
- getErrorCodeColumnName()
- エラーコードを格納するカラム名を返します。 この値は、エラーテーブルに実際に存在するカラム名である必要があります。
- getErrorCodeValue()
- 重複チェックに失敗した場合に設定されるエラーコードです。 この値は重複チェックに失敗したレコードがエラーテーブルに格納される際に、上記「エラーコードを格納するカラム」に自動的に設定されます。
| [10] | エラーコードを格納するカラムだけは、正常テーブルになくても大丈夫です | 
| [11] | com.asakusafw.vocabulary.bulkloader.DupCheckDbExporterDescription | 
正常テーブルとエラーテーブルの構造が大きく異なる場合¶
正常テーブルよりもエラーテーブルの情報が多い場合や、正常テーブルとエラーテーブルの構造が大きく異なる場合には、 それらの両方のプロパティを持つデータモデルを予め作成する必要があります。 ここでは、そのようなデータモデルを「ユニオンモデル」と仮に呼ぶことにします。
なお、ここで想定する正常テーブルとエラーテーブルは次のような制約があるものとします。
- 正常テーブル- 重複チェック用のカラムを含んでいる
 
- エラーテーブル- エラーコードを格納するカラムが存在する
 
ユニオンモデルは、上記の2つのテーブルの全てのカラムを持つようなデータ構造である必要があります [12] 。
正常テーブルの名前が NORMAL_TABLE, エラーテーブルの名前が ERROR_TABLE である場合、
ユニオンモデルはDMDLで次のように記述できます [13] 。
union_model = normal_table + error_table;
上記の記述によって、 UnionModel という名前のユニオンモデルを作成できます。
また、ジョブフローやフロー部品では、ユニオンテーブルのテーブルモデルを使って処理を行います。
ユニオンテーブルのテーブルモデルをエクスポートする際に、先ほどと同様に DupCheckDbExporterDescription を指定して、次のように書きます。
/**
 * 重複チェックつきエクスポータの動作を定義する (ユニオンモデル)。
 */
public class DupCheckExporterExample2 extends DupCheckDbExporterDescription {
    @Override
    public String getTargetName() {
        return "asakusa";
    }
    @Override
    public Class<?> getModelType() {
        return UnionModel.class;
    }
    @Override
    protected Class<?> getNormalModelType() {
        return NormalTable.class;
    }
    @Override
    protected Class<?> getErrorModelType() {
        return ErrorTable.class;
    }
    @Override
    protected List<String> getCheckColumnNames() {
        return Arrays.asList("VALUE");
    }
    @Override
    protected String getErrorCodeColumnName() {
        return "ERR_CODE";
    }
    @Override
    protected String getErrorCodeValue() {
        return "999";
    }
}
この構造は 正常テーブルよりもエラーテーブルの情報が少ない場合 とほとんど同じですが、メソッド getModelType() の戻り値が異なっています。
ジョブフローの出力もここに指定する型(ユニオンモデル)でなくてはならないことに注意して下さい。
全体としては以下のように設定します。
- getTargetName()
- エクスポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 利用方法は通常のエクスポータやインポータと同様です。
- getModelType()
- ユニオンモデルクラスを返します。
- getNormalModelType()
- 正常テーブルのテーブルモデルクラスを返します。
- getErrorModelType()
- エラーテーブルのテーブルモデルクラスを返します。
- getCheckColumnNames()
- 重複チェックを行うカラム名の一覧を返します。 この値は、正常テーブルのテーブルモデルに存在するカラムを指定する必要があります。
- getErrorCodeColumnName()
- エラーコードを格納するカラム名を返します。 この値は、エラーテーブルに実際に存在するカラム名である必要があります。
- getErrorCodeValue()
- 重複チェックに失敗した場合に設定されるエラーコードです。 この値は重複チェックに失敗したレコードがエラーテーブルに格納される際に、上記「エラーコードを格納するカラム」に自動的に設定されます。
この機能で想定するユースケースは、「別システムからの取り込みとクレンジング処理のバッチ」です。
- 取込みデータの形式をユニオンモデルで表す
- 正常テーブルは、業務に必要なカラムだけを含める
- エラーテーブルは、エラートラッキングに必要なカラムだけを含める
- 取込みデータをクレンジングして、エラーがあればエラーカラムに情報をセットして、エラーテーブルに情報を書き出す
- クレンジングしたデータは、重複チェック機能を使って正常テーブルに情報を書き出す- 重複チェックに成功した場合には、必要なカラムだけを正常テーブルに書き出す
- 重複チェックに失敗した場合には、エラーカラムに「重複エラー」の情報を設定して、エラーテーブルに情報を書き出す
 
| [12] | より厳密には、「エラーコードカラム」に対応するプロパティはユニオンモデルに不要です | 
| [13] | DMDLの利用方法は、 DMDLユーザーガイド を参照して下さい | 
キャッシュ機能¶
ThunderGateでは、インポート時に前回インポートからの差分のみを転送する「キャッシュ機能」が提供されています。 ただし、キャッシュを利用するには主に以下のような制限があります。
- 条件式 ( getWhere())を利用できない
- ロック ( getLockType())時に行単位のロックを指定できない
- 同一のキャッシュを複数のジョブフローで同時に利用できない
詳しい利用方法や、利用時の注意などは Cache for ThunderGate を参照してください。
