Spring Batch チャンク管理された一連処理
Spring Batch 前回からの続きです。
今回は、公式に乗っていたチャンク管理された一連処理についてです。
参考
- 公式
Getting Started · Creating a Batch Service - Spring BootではなくSpring Bacthの解説
01.Spring Batchの基本概念(ステップ) - soracane
2つめは、2010年と古い情報ですが、Spring Batch の流れについて分かりやすかったです。
一連処理
- Read(データ読込み)
- Processor(データ加工)
- Write(データ書込み)
この時、データ加工が一定数貯まると、データ書込みの処理が行われます。
また、リスナーを設定し、処理の結果によるログ出し等ができます。
ただし、この処理はReadがNullかExceptionを返すまで永遠と繰り返されます。
ソース
基本的には公式の写経ですが、DB接続やCSV読込みまで行う気は無かったため、適当に置き換えています。
ジョブ登録
package com.hello.batch.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.hello.batch.listener.JobCompletionNotificationListener; import com.hello.batch.processor.GetProcessor; import com.hello.batch.reader.GetReader; import com.hello.batch.writer.GetWriter; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public GetReader reader() { return new GetReader(); } @Bean public GetProcessor processor() { return new GetProcessor(); } @Bean public GetWriter writer() { return new GetWriter(); } @Bean public JobExecutionListener listener() { return new JobCompletionNotificationListener(); } @Bean public Job getJob(Step step1) { return jobs.get("getJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(step1()) .end().build(); } @Bean(name = "step1") public Step step1() { return steps.get("step1").<String, String>chunk(1).reader(reader()).processor(processor()) .writer(writer()).build(); } }
- Stepのgetに対して<String, String>でデータの流れ(in(Read)とout(Write))を設定
- chunk(データ数)でトランザクションの単位を設定
リスナー
package com.hello.batch.listener; import org.springframework.batch.core.listener.JobExecutionListenerSupport; public class JobCompletionNotificationListener extends JobExecutionListenerSupport { }
データ読込み処理
package com.hello.batch.reader; import org.springframework.batch.item.ItemReader; import lombok.extern.slf4j.Slf4j; @Slf4j public class GetReader implements ItemReader<String> { @Override public String read() throws Exception { String out = "hello "; log.info("read (" + out + ")"); return out; } }
@Slf4jは関係ないですが、lombokによるロガーの自動宣言です。
データ加工
package com.hello.batch.processor; import org.springframework.batch.item.ItemProcessor; import lombok.extern.slf4j.Slf4j; @Slf4j public class GetProcessor implements ItemProcessor<String, String> { @Override public String process(final String in) throws Exception { String out = in + ", world!"; log.info("process (" + in + ") ,(" + out + ")"); return out; } }
データ書込み
package com.hello.batch.writer; import java.util.List; import org.springframework.batch.item.ItemWriter; import lombok.extern.slf4j.Slf4j; @Slf4j public class GetWriter implements ItemWriter<String> { @Override public void write(List<? extends String> items) throws Exception { for (String string : items) { log.info("write (" + string + ")"); } } }
チャンクにより複数件を処理するため、リストの形式で引数に取ります。
List<? extends String> の記述は初めて見ましたが、 Stringか,Stringを継承しているクラスのリスト という意味です。
なお、上記のコードでは無限ループしてしまいます。(Readがnullを返すことがないため。)
公式のサンプルにあるように、CSVから読込み、加工結果をDBへ登録という一連の処理を行う必要がある場合は、便利に扱えそうです。
しかし、今回私が望んでいたのは、前回の方法で実現するようなバッチ処理だと思いましたので、前回の手法に対して条件分岐の方法を調査します。