暇な日々にスパイスを

学んだ技術の備忘録

Spring Batch チャンク管理された一連処理

Spring Batch 前回からの続きです。

今回は、公式に乗っていたチャンク管理された一連処理についてです。

参考

  1. 公式
    Getting Started · Creating a Batch Service
  2. Spring BootではなくSpring Bacthの解説
    01.Spring Batchの基本概念(ステップ) - soracane

2つめは、2010年と古い情報ですが、Spring Batch の流れについて分かりやすかったです。

一連処理

  1. Read(データ読込み)
  2. Processor(データ加工)
  3. Write(データ書込み)

この時、データ加工が一定数貯まると、データ書込みの処理が行われます。
また、リスナーを設定し、処理の結果によるログ出し等ができます。
ただし、この処理はReadがNullかExceptionを返すまで永遠と繰り返されます。

ソース

基本的には公式の写経ですが、DB接続やCSV読込みまで行う気は無かったため、適当に置き換えています。

ジョブ登録

package com.hello.batch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.hello.batch.listener.JobCompletionNotificationListener;
import com.hello.batch.processor.GetProcessor;
import com.hello.batch.reader.GetReader;
import com.hello.batch.writer.GetWriter;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

  @Autowired
  private JobBuilderFactory jobs;

  @Autowired
  private StepBuilderFactory steps;

  @Bean
  public GetReader reader() {
    return new GetReader();
  }

  @Bean
  public GetProcessor processor() {
    return new GetProcessor();
  }

  @Bean
  public GetWriter writer() {
    return new GetWriter();
  }

  @Bean
  public JobExecutionListener listener() {
    return new JobCompletionNotificationListener();
  }

  @Bean
  public Job getJob(Step step1) {
    return jobs.get("getJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(step1())
        .end().build();
  }

  @Bean(name = "step1")
  public Step step1() {
    return steps.get("step1").<String, String>chunk(1).reader(reader()).processor(processor())
        .writer(writer()).build();
  }
}
  1. Stepのgetに対して<String, String>でデータの流れ(in(Read)とout(Write))を設定
  2. chunk(データ数)でトランザクションの単位を設定

リスナー

package com.hello.batch.listener;

import org.springframework.batch.core.listener.JobExecutionListenerSupport;

public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

}

データ読込み処理

package com.hello.batch.reader;

import org.springframework.batch.item.ItemReader;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GetReader implements ItemReader<String> {
  @Override
  public String read() throws Exception {
    String out = "hello ";
    log.info("read (" + out + ")");

    return out;
  }
}

@Slf4jは関係ないですが、lombokによるロガーの自動宣言です。

データ加工

package com.hello.batch.processor;

import org.springframework.batch.item.ItemProcessor;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GetProcessor implements ItemProcessor<String, String> {

  @Override
  public String process(final String in) throws Exception {
    String out = in + ", world!";
    log.info("process (" + in + ") ,(" + out + ")");

    return out;
  }

}

データ書込み

package com.hello.batch.writer;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GetWriter implements ItemWriter<String> {

  @Override
  public void write(List<? extends String> items) throws Exception {
    for (String string : items) {
      log.info("write (" + string + ")");
    }
  }
}

チャンクにより複数件を処理するため、リストの形式で引数に取ります。
List<? extends String> の記述は初めて見ましたが、 Stringか,Stringを継承しているクラスのリスト という意味です。

なお、上記のコードでは無限ループしてしまいます。(Readがnullを返すことがないため。)
公式のサンプルにあるように、CSVから読込み、加工結果をDBへ登録という一連の処理を行う必要がある場合は、便利に扱えそうです。
しかし、今回私が望んでいたのは、前回の方法で実現するようなバッチ処理だと思いましたので、前回の手法に対して条件分岐の方法を調査します。