[Spring Batch] partitioner
정리
파티셔닝 (Partitioning)의 개념
- 파티셔닝이란, 병렬로 chunk 단위(각 slave들)을 동시에 수행하는 것
- partitioner는 paging 역할을 하여 데이터를 병렬적으로 읽게 해줌
- master도 한 스텝이고, slave도 별도의 스텝
@Component
@Scope("step")
public class TestPartitioner implements Partitioner {
@Value("#{jobExecutionContext['TOTAL_COUNT']}") //총 개수를 이전 Step에서 jobExecutionContext에 저장
private Long totalCount;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionMap = new HashMap<>();
//totalCount 에 따른 페이징 처리
int pageSize = (int) (totalCount / gridSize);
pageSize = pageSize % gridSize == 0 ? pageSize + 1 : pageSize;
for (int i = 0; i < gridSize; i++) {
if (totalCount < gridSize) {
gridSize = totalCount.intValue();
}
ExecutionContext executionContext = new ExecutionContext();
executionContext.putInt("offset", i * pageSize); //ExecutionContext에 페이지 정보 저장
executionContext.putInt("limit", pageSize);
partitionMap.put("test_partition_" + i, executionContext);
}
return partitionMap;
}
}
@Component
@Scope("step")
public class TestReader extends AbstractItemReader<String> {
private int cursor = 0;
private List<String> testList = Lists.newArrayList();
@Autowired
private TestRepository testRepository;
@BeforeStep
void beforeStep(final StepExecution stepExecution) {
//executionContext에 저장 된 페이지 정보 꺼냄
int offset = stepExecution.getExecutionContext().getInt("offset");
int limit = stepExecution.getExecutionContext().getInt("limit");
//DB 조회
testList = testRepository.getList(offset, limit);
}
@Override
protected TestResult doRead() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (cursor < testList.size()) {
return testList.get(cursor++);
}
return null;
}
}