[Spring Batch] partitioner

정리


파티셔닝 (Partitioning)의 개념

partitioner01

partitioner02

  • 파티셔닝이란, 병렬로 chunk 단위(각 slave들)을 동시에 수행하는 것
  • partitioner는 paging 역할을 하여 데이터를 병렬적으로 읽게 해줌
  • master도 한 스텝이고, slave도 별도의 스텝
@Component
@Scope("step")
public class TestPartitioner implements Partitioner {
  @Value("#{jobExecutionContext['TOTAL_COUNT']}") //총 개수를 이전 Step에서 jobExecutionContext에 저장
  private Long totalCount;

  @Override
  public Map<String, ExecutionContext> partition(int gridSize) {
    Map<String, ExecutionContext> partitionMap = new HashMap<>();

    //totalCount 에 따른 페이징 처리
    int pageSize = (int) (totalCount / gridSize);
    pageSize = pageSize % gridSize == 0 ? pageSize + 1 : pageSize;
    for (int i = 0; i < gridSize; i++) {
      if (totalCount < gridSize) {
        gridSize = totalCount.intValue();
      }
      ExecutionContext executionContext = new ExecutionContext();
      executionContext.putInt("offset", i * pageSize);  //ExecutionContext에 페이지 정보 저장
      executionContext.putInt("limit", pageSize);
      partitionMap.put("test_partition_" + i, executionContext);
    }
    return partitionMap;
  }
}
@Component
@Scope("step")
public class TestReader extends AbstractItemReader<String> {
  private int cursor = 0;
  private List<String> testList = Lists.newArrayList();

  @Autowired
  private TestRepository testRepository;

  @BeforeStep
  void beforeStep(final StepExecution stepExecution) {
    //executionContext에 저장 된 페이지 정보 꺼냄
    int offset = stepExecution.getExecutionContext().getInt("offset");
    int limit = stepExecution.getExecutionContext().getInt("limit");
    //DB 조회
    testList = testRepository.getList(offset, limit);
  }

  @Override
  protected TestResult doRead() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    if (cursor < testList.size()) {
      return testList.get(cursor++);
    }
    return null;
  }
}

참고