Asakusa on Spark リファレンス¶
この文書では、Asakusa on Sparkが提供するGradle PluginやDSLコンパイラの設定、およびバッチアプリケーション実行時の設定などについて説明します。
Asakusa on Spark Gradle Plugin リファレンス¶
Asakusa on Spark Gradle Pluginが提供する機能とインターフェースについて個々に解説します。
プラグイン¶
- asakusafw-spark
- アプリケーションプロジェクトで、Asakusa on Sparkのさまざまな機能を有効にする [1] 。 - このプラグインは - asakusafw-sdkプラグインや- asakusafw-organizerプラグインを拡張するように作られているため、それぞれのプラグインも併せて有効にする必要がある(- apply plugin: 'asakusafw-spark'だけではほとんどの機能を利用できません)。
| [1] | com.asakusafw.spark.gradle.plugins.AsakusafwSparkPlugin | 
タスク¶
- sparkCompileBatchapps
- Asakusa DSL Compiler for Sparkを利用してDSLをコンパイルする [2] 。 - asakusafw-sdkプラグインが有効である場合にのみ利用可能。
- attachComponentSpark
- デプロイメントアーカイブにSpark向けのバッチアプリケーションを実行するためのコンポーネントを追加する。 - asakusafw-organizerプラグインが有効である場合にのみ利用可能。- asakusafwOrganizer.spark.enabledに- trueが指定されている場合、自動的に有効になる。
- attachSparkBatchapps
- デプロイメントアーカイブに - sparkCompileBatchappsでコンパイルした結果を含める。- asakusafw-sdk,- asakusafw-organizerの両プラグインがいずれも有効である場合にのみ利用可能。- asakusafwOrganizer.batchapps.enabledに- trueが指定されている場合、自動的に有効になる。
| [2] | com.asakusafw.gradle.tasks.AsakusaCompileTask | 
タスク拡張¶
- assemble
- デプロイメントアーカイブを生成する。 - asakusafw-sparkと- asakusafw-organizerプラグインがいずれも有効である場合、- sparkCompileBatchappsが依存関係に追加される。
- compileBatchapp
- Asakusa DSLコンパイラを使ってバッチアプリケーションのコンパイルを行い、実行可能モジュールを生成する。 - asakusafw-sparkプラグインが有効である場合、- sparkCompileBatchappsが依存関係に追加される。
- jarBatchapp
- compileBatchappタスクで生成したバッチアプリケーションを含むjarファイルを生成する。- asakusafw-sparkプラグインが有効である場合、- sparkCompileBatchappsタスクの生成物がjarファイルの内容に追加される。
規約プロパティ拡張¶
Batch Application Plugin ( asakusafw ) への拡張¶
Asakusa on Spark Gradle Pluginは Batch Application Plugin に対して Asakusa on Sparkのビルド設定を行うための規約プロパティを追加します。この規約プロパティは、 asakusafw ブロック内の参照名 spark でアクセスできます [3] 。
以下、 build.gradle の設定例です。
asakusafw {
    spark {
        include 'com.example.batch.*'
        option 'operator.logging.level', 'warn'
    }
この規約オブジェクトは以下のプロパティを持ちます。
- spark.version
- Asakusa on Sparkのコンポーネントバージョンを保持する。 - この値は設定による変更は不可。 - 既定値: Asakusa on Spark Gradle Pluginが保持する既定のバージョン 
- spark.outputDirectory
- コンパイラの出力先を指定する。 - 文字列や - java.io.Fileなどで指定し、相対パスが指定された場合にはプロジェクトからの相対パスとして取り扱う。- 既定値: - "$buildDir/spark-batchapps"
- spark.include
- コンパイルの対象に含めるバッチクラス名のパターンを指定する。 - バッチクラス名には - *でワイルドカードを含めることが可能。- また、バッチクラス名のリストを指定した場合、それらのパターンのいずれかにマッチしたバッチクラスのみをコンパイルの対象に含める。 - 既定値: - null(すべて)
- spark.exclude
- コンパイルの対象から除外するバッチクラス名のパターンを指定する。 - バッチクラス名には - *でワイルドカードを含めることが可能。- また、バッチクラス名のリストを指定した場合、それらのパターンのいずれかにマッチしたバッチクラスをコンパイルの対象から除外する。 - includeと- excludeがいずれも指定された場合、- excludeのパターンを優先して取り扱う。- 既定値: - null(除外しない)
- spark.runtimeWorkingDirectory
- 実行時のテンポラリワーキングディレクトリのパスを指定する。 - パスにはURIやカレントワーキングディレクトリからの相対パスを指定可能。 - 未指定の場合、コンパイラの標準設定である「 - target/hadoopwork」を利用する。- 既定値: - null(コンパイラの標準設定を利用する)
- spark.option
- コンパイラプロパティ (コンパイラのオプション設定)を追加する。 - 後述する コンパイラプロパティ を - <key>, <value>の形式で指定する [4] 。- 既定値: (Spark向けのコンパイルに必要な最低限のもの) 
- spark.batchIdPrefix
- Spark向けのバッチアプリケーションに付与するバッチIDの接頭辞を指定する。 - 文字列を設定すると、それぞれのバッチアプリケーションは「 - <接頭辞><本来のバッチID>」というバッチIDに強制的に変更される。- 空文字や - nullを指定した場合、本来のバッチIDをそのまま利用するが、他のコンパイラが生成したバッチアプリケーションと同じバッチIDのバッチアプリケーションを生成した場合、アプリケーションが正しく動作しなくなる。- 既定値: - "spark."
- spark.failOnError
- Spark向けのコンパイルを行う際に、コンパイルエラーが発生したら即座にコンパイルを停止するかどうかを選択する。 - コンパイルエラーが発生した際に、 - trueを指定した場合にはコンパイルをすぐに停止し、- falseを指定した場合には最後までコンパイルを実施する。- 既定値: - true(即座にコンパイルを停止する)
| [3] | これらのプロパティは規約オブジェクト com.asakusafw.gradle.plugins.AsakusafwCompilerExtension が提供します。 | 
| [4] | コンパイラプロパティを指定する方法は他にいくつかの方法があります。詳しくは com.asakusafw.gradle.plugins.AsakusafwCompilerExtension のメソッドの説明を参照してください。 | 
Framework Organizer Plugin ( asakusafwOrganizer ) への拡張¶
Asakusa on Spark Gradle Plugin は Framework Organizer Plugin に対して Asakusa on Sparkのビルド設定を行うための規約プロパティを追加します。この規約プロパティは、 asakusafwOrganizer ブロック内の参照名 spark でアクセスできます [5] 。
この規約オブジェクトは以下のプロパティを持ちます。
- spark.enabled
- デプロイメントアーカイブにSpark向けのバッチアプリケーションを実行するためのコンポーネントを追加するかどうかを指定する (各プロファイルのデフォルト値)。 - trueを指定した場合にはコンポーネントを追加し、- falseを指定した場合には追加しない。- 既定値: - true(コンポーネントを追加する)
- <profile>.spark.enabled
- 対象のプロファイルに対し、デプロイメントアーカイブにSpark向けのバッチアプリケーションを実行するためのコンポーネントを追加するかどうかを指定する。 - 前述の - spark.enabledと同様だが、こちらはプロファイルごとに指定できる。- 既定値: - asakusafwOrganizer.spark.enabled(全体のデフォルト値を利用する)
| [5] | これらのプロパティは規約オブジェクト com.asakusafw.spark.gradle.plugins.AsakusafwOrganizerSparkExtension が提供します。 | 
コマンドラインオプション¶
sparkCompileBatchapps タスクを指定して gradlew コマンドを実行する際に、 sparkCompileBatchapps --update <バッチクラス名> と指定することで、指定したバッチクラス名のみをバッチコンパイルすることができます。
また、バッチクラス名の文字列には * をワイルドカードとして使用することもできます。
以下の例では、パッケージ名に com.example.target.batch を含むバッチクラスのみをバッチコンパイルしてデプロイメントアーカイブを作成しています。
./gradlew sparkCompileBatchapps --update com.example.target.batch.* assemble
そのほか、 sparkCompileBatchapps タスクは gradlew コマンド実行時に以下のコマンドライン引数を指定することができます。
- 
--options<k1=v1[,k2=v2[,...]]>¶
- 追加のコンパイラプロパティを指定する。 - 規約プロパティ - asakusafw.spark.optionで設定したものと同じキーを指定した場合、それらを上書きする。
- 
--batch-id-prefix<prefix.>¶
- 生成するバッチアプリケーションに、指定のバッチID接頭辞を付与する。 - 規約プロパティ - asakusafw.spark.batchIdPrefixの設定を上書きする。
- 
--fail-on-error<"true"|"false">¶
- コンパイルエラー発生時に即座にコンパイル処理を停止するかどうか。 - 規約プロパティ - asakusafw.spark.failOnErrorの設定を上書きする。
- 
--update<batch-class-name-pattern>¶
- 指定のバッチクラスだけをコンパイルする (指定したもの以外はそのまま残る)。 - 規約プロパティ - asakusafw.spark.{in,ex}cludeと同様にワイルドカードを利用可能。- このオプションが設定された場合、規約プロパティ - asakusafw.spark.{in,ex}cludeの設定は無視する。
Asakusa DSL Compiler for Spark リファレンス¶
コンパイラプロパティ¶
Asakusa DSL Compiler for Sparkで利用可能なコンパイラプロパティについて説明します。これらの設定方法については、 Batch Application Plugin ( asakusafw ) への拡張 の spark.option の項を参照してください。
- directio.input.filter.enabled
- Direct I/O input filterを有効にするかどうか。 - trueならば有効にし、- falseならば無効にする。- 既定値: - true
- operator.checkpoint.remove
- DSLで指定した - @Checkpoint演算子をすべて除去するかどうか。- trueならば除去し、- falseならば除去しない。- 既定値: - false
- operator.logging.level
- DSLで指定した - @Logging演算子のうち、どのレベル以上を表示するか。- debug,- info,- warn,- errorのいずれかを指定する。- 既定値: - info
- operator.aggregation.default
- DSLで指定した - @Summarize,- @Fold演算子の- partialAggregateに- PartialAggregation.DEFAULTが指定された場合に、どのように集約を行うか。- totalであれば部分集約を許さず、- partialであれば部分集約を行う。- 既定値: - total
- input.estimator.tiny
- インポーター記述の - getDataSize()に- DataSize.TINYが指定された際、それを何バイトのデータとして見積もるか。- 値にはバイト数か、 - +Inf(無限大)、- NaN(不明) のいずれかを指定する。- 主に、 - @MasterJoin系の演算子でJOINのアルゴリズムを決める際など、データサイズによる最適化の情報として利用される。- 既定値: - 10485760(10MB)
- input.estimator.small
- インポーター記述の - getDataSize()に- DataSize.SMALLが指定された際、それを何バイトのデータとして見積もるか。- その他については - input.estimator.tinyと同様。- 既定値: - 209715200(200MB)
- input.estimator.large
- インポーター記述の - getDataSize()に- DataSize.LARGEが指定された際、それを何バイトのデータとして見積もるか。- その他については - input.estimator.tinyと同様。- 既定値: - +Inf(無限大)
- operator.join.broadcast.limit
- @MasterJoin系の演算子で、broadcast joinアルゴリズムを利用して結合を行うための、マスタ側の最大入力データサイズ。- 基本的には - input.estimator.tinyで指定した値の2倍程度にしておくのがよい。- 既定値: - 20971520(20MB)
- operator.estimator.<演算子注釈名>
- 指定した演算子の入力に対する出力データサイズの割合。 - 「演算子注釈名」には演算子注釈の単純名 ( - Extract,- Foldなど) を指定し、値には割合 (- 1.0,- 2.5など) を指定する。- たとえば、「 - operator.estimator.CoGroup」に- 5.0を指定した場合、すべての- @CoGroup演算子の出力データサイズは、入力データサイズの合計の5倍として見積もられる。- 既定値: operator.estimator.* のデフォルト値 を参照 
- <バッチID>:<オプション名>
- 指定のオプションを、指定のIDのバッチに対してのみ有効にする。 - バッチIDは - spark.などのプレフィックスが付与する まえの ものを指定する必要がある。- 既定値: N/A 
- spark.input.direct
- ジョブフローの入力データを(可能ならば)Sparkから直接読むかどうか。 - これが有効である場合、Direct I/Oではprologueフェーズを省略してSparkから直接ファイルを読み出す。 - WindGateの場合はどちらもSparkからは読み出さず、WindGateのプログラムを利用してファイルシステム上に展開する。 - 既定値: - true
- spark.output.direct
- ジョブフローの出力データを(可能ならば)Sparkから直接書き出すかどうか。 - これが有効である場合、Direct I/Oではepilogueフェーズを省略してSparkから直接ファイルを書き出す。 - WindGateの場合はどちらもSparkからは書き出さず、WindGateのプログラムを利用して外部リソース上に展開する。 - 既定値: - true
Deprecated
Asakusa Framework バージョン 0.10.0 以降、 spark.input.direct および spark.output.direct を false に指定した利用は非推奨となりました。
spark.input.direct , spark.output.direct のいずれかの値を false に設定した場合、
YAESSによるバッチアプリケーション実行にはHadoopコマンドを利用できる環境が必要です。
利用するHadoopコマンドの設定方法や検索方法については、 YAESSユーザーガイド を参照してください。
- spark.parallelism.limit.tiny
- Sparkでシャッフル処理を行う際に、データサイズの合計が指定のバイト数以下であれば分割数を1に制限する。 - データサイズにはバイト数か、 - +Inf(無限大)、- NaN(無効化) のいずれかを指定する。- データサイズは、 - input.estimator.tinyなどで指定した見積もりを利用する。- 既定値: - 20971520(20MB)
- spark.parallelism.limit.small
- Sparkでシャッフル処理を行う際に、データサイズの合計が指定のバイト数以下であれば分割数を規定の - 0.5倍に設定する。- その他については - spark.parallelism.limit.tinyと同様。- 既定値: - NaN(無効化)
- spark.parallelism.limit.regular
- Sparkでシャッフル処理を行う際に、データサイズの合計が指定のバイト数以下であれば分割数を規定の - 1.0倍に設定する。- その他については - spark.parallelism.limit.tinyと同様。- 標準では - +Infが指定されているため、下記の- largeや- hugeを利用したい場合には有限の値を指定する必要がある。- 既定値: - +Inf(無限大)
- spark.parallelism.limit.large
- Sparkでシャッフル処理を行う際に、データサイズの合計が指定のバイト数以下であれば分割数を規定の - 2.0倍に設定する。- その他については - spark.parallelism.limit.tinyと同様。- 既定値: - +Inf(無限大)
- spark.parallelism.limit.huge
- Sparkでシャッフル処理を行う際に、データサイズの合計が指定のバイト数以下であれば分割数を規定の - 4.0倍に設定する。- その他については - spark.parallelism.limit.tinyと同様。- 通常の場合、この設定がもっとも大きなデータサイズを表すため、 - +Infから変更しない方がよい。- 既定値: - +Inf(無限大)
- spark.parallelism.operator.<演算子>
- 指定の演算子を含むSparkのステージに対し、入力データサイズを強制的に指定する。 - データサイズは - tiny,- small,- regular,- large,- hugeのいずれかから指定し、それぞれシャッフル時の分割数が- 1,- 0.5倍,- 1.0倍,- 2.0倍,- 4.0倍に設定される。- 同一のステージに対して複数の演算子のデータサイズが指定された場合、そのうちもっとも大きなものが利用される。 - 既定値: N/A 
- spark.planning.option.unifySubplanIo
- Sparkの等価なステージの入出力を一つにまとめる最適化を有効にするかどうか。 - trueならば有効にし、- falseならば無効にする。- 無効化した場合、ステージの入出力データが増大する場合があるため、特別な理由がなければ有効にするのがよい。 - 既定値: - true
- spark.planning.option.checkpointAfterExternalInputs
- ジョブフローの入力の直後にチェックポイント処理を行うかどうか。 - trueならばチェックポイント処理を行い、- falseならば行わない。- チェックポイント処理を行う場合、入力データの保存が余計に行われるため、特別な理由がなければ無効にするのがよい。 - なお、Direct I/Oのオリジナルデータを2回以上読みたくない場合にチェックポイント処理が有効な場合があるが、その場合には - spark.input.directを無効にした方が多くの場合で効率がよい。- 既定値: - false
operator.estimator.* のデフォルト値¶
| 演算子注釈名 | 計算式 | 
|---|---|
| Checkpoint | 入力の 1.0倍 | 
| Logging | 入力の 1.0倍 | 
| Branch | 入力の 1.0倍 | 
| Project | 入力の 1.0倍 | 
| Extend | 入力の 1.25倍 | 
| Restructure | 入力の 1.25倍 | 
| Split | 入力の 1.0倍 | 
| Update | 入力の 2.0倍 | 
| Convert | 入力の 2.0倍 | 
| Summarize | 入力の 1.0倍 | 
| Fold | 入力の 1.0倍 | 
| MasterJoin | トランザクション入力の 2.0倍 | 
| MasterJoinUpdate | トランザクション入力の 2.0倍 | 
| MasterCheck | トランザクション入力の 1.0倍 | 
| MasterBranch | トランザクション入力の 1.0倍 | 
| Extract | 既定値無し | 
| GroupSort | 既定値無し | 
| CoGroup | 既定値無し | 
既定値がない演算子に対しては、有効なデータサイズの見積もりを行いません。
制限事項¶
ここでは、Asakusa on Spark固有の制限事項について説明します。これらの制限は将来のバージョンで緩和される可能性があります。
