-
Spring Batch 기초 정리Java & Spring/Spring Batch 2021. 6. 22. 17:44반응형
Spring Batch
Batch Application
- 데이터에 대한 일괄 처리 작업을 사람 개입 없이 자동으로 지원해주는 Application
- Batch : 일괄처리
- Scheduler : 일정 시간마다 실행시키는 역할
- Quartz? Spring Batch?
- Quartz : Job Scheduling Library
- Quartz는 스케쥴러의 역할이지 Batch와 같이 대용량 데이터 배치 처리에 대한 기능 지원 X
- Quartz외에도 Spring Scheduler, Jenkins, Cron 등이 존재
- Spring Batch : Bach Framework
- Spring Batch는 스케쥴링 프레임워크가 아님
- Batch는 Quartz의 스케쥴링 기능 지원 X
- 보통 Quartz + Spring Batch로 사용하면서 데이터 일괄처리를 자동으로 진행
- 요즘에는 Jenkins + Spring Batch로 사용
- Quartz : Job Scheduling Library
- 간단한 일정 주기 반복성 작업 ~> Spring Batch를 사용하지 않고 Scheduler만 사용해서 처리하는 것도 좋음
Batch Application
VSWeb Application
- Web : 실시간 처리 / 상대적인 속도 / QA 용이
- Batch: 후속 처리 / 절대적인 속도 / QA 복잡성 > 테스트코드 중요
The Domain Language of Batch
- Batch Stereotypes
- Job
- 배치 처리과정을 하나의 단위로 만들어 표현한 객체
- 하나의 Job에는 1 : N으로 Step을 가짐
- Step 인스턴스의 컨테이너 역할
- JobInstance
- Job이 실제 실행되는 단위
- JobInstance는 여러개의 JobExecution을 가질 수 있음
- JobParameters
- 배치 작업을 시작하는데 사용되는 파라미터를 보유
- 식별자로 사용되거나 실행 중 참조될 수 있음
- JobExecution
- JobInstance에 대한 한 번의 실행을 나타내는 객체
- JobExcution은 JobInstance, 배치 실행 상태, 시작 시간, 끝난 시간, 실패했을 때 메시지 등의 정보를 담고 있음
- 실행이 성공적으로 완료되지 않는 한 지정된 실행에 해당하는 JobInstance는 완료 간주X
- Step
- 실질적인 배치 처리(모든 정보를 담고 있음)
- 모든 Job에는 1개 이상의 Step이 존재해야함
- Step 안에 Tasklet 혹은 Reader & Processor & Writer 묶음이 존재
- 배치 Job의 독립적이고 순차적인 단계를 캡슐화하는 도메인 객체
- StepExecution
- JobExecution과 유사하며, Step의 실제 정보를 담는 객체
- 이전 step이 실패하면 다음 step 진행을 하지 않음
- JobRepository
- 위의 모든 절차에 대한 지속적인 매커니즘
- JobRepository에서 JobExecution을 관리
- 첫 Job이 실행되면 JobRepository로 부터 JobExecution을 가져와서 실행하는 동안 step 및 job execution을 JobRepository로 전달하며 유지
- JobLauncher, Job, Step에 대한 CRUD를 제공
- JobLauncher
- JobParameters와 함께 실행되는 Job에 대한 간단한 인터페이스
- Tasklet
- Step 안에서 단일로 수행될 커스텀한 기능들을 선언할 때 사용
- ItemReader
- step에서 배치 데이터를 읽어오는 역할
- ItemWriter
- step의 결과인 배치 데이터를 저장하는 역할
- 일반적으로 DB나 파일에 저장
- step의 결과인 배치 데이터를 저장하는 역할
- ItemProcessor
- 배치 데이터를 가공 & 변환하는 역할 (필수 X)
- 비즈니스 로직의 분리 ~> ItemWriter는 저장 수행, ItemProcessor는 로직 처리만 수행
- 배치 데이터를 가공 & 변환하는 역할 (필수 X)
- Tasklet || ItemReader + ItemWriter + ItemProcessor로 사용
Spring Batch Job Flow
next()
- 순차적으로 Step을 연결
.on()
- 캐치할 ExitStatus 지정
*
일 경우 모든 ExitStatus가 지정됨
to()
- 다음으로 이동할 Step 지정
from()
- 일종의 이벤트 리스너 역할
- 상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출
- Step의 이벤트 캐치가 FAILED로 되어있는 상태에서 추가로 이벤트를 캐치하려면 from을 써야만 함
decide
- 지정된 ExitStatus외의 분기를 컨트롤할 경우, ExitStatus를 커스텀해야하기 때문에 번거로움
- Step들의 Flow 속에서 분기만 담당하는 타입
StepJob 예시
@Slf4j @Configuration @RequiredArgsConstructor public class StepJobConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job stepNextJob() { return jobBuilderFactory.get("stepJob") .start(step1()) .next(step2()) .next(conditionalJobStep1()) .on("FAILED") // ExitStatus가 FAILED인 경우 .to(conditionalJobStep3()) .on("*") .end() .from(conditionalJobStep1()) .on("*") // ExitStatus가 FAILED가 아닌 경우 .to(conditionalJobStep2()) .next(conditionalJobStep3()) .end() .end() .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet((contribution, chunkContext) -> { log.info(">>>>> This is Step1"); }).build(); } @Bean public Step step2() { return stepBuilderFacotry.get("step2") .tasklet((contribution, chunkContext) -> { log.info(">>>>> This is Step2"); }).build(); } @Bean public Step conditionalJobStep1() { return stepBuilderFactory.get("step1") .tasklet((contribution, chunkContext) -> { log.info(">>>>> This is stepNextConditionalJob Step1"); /** ExitStatus를 FAILED로 지정한다. 해당 status를 보고 flow가 진행된다. **/ contribution.setExitStatus(ExitStatus.FAILED); return RepeatStatus.FINISHED; }).build(); } @Bean public Step conditionalJobStep2() { return stepBuilderFactory.get("conditionalJobStep2") .tasklet((contribution, chunkContext) -> { log.info(">>>>> This is stepNextConditionalJob Step2"); return RepeatStatus.FINISHED; }).build(); } @Bean public Step conditionalJobStep3() { return stepBuilderFactory.get("conditionalJobStep3") .tasklet((contribution, chunkContext) -> { log.info(">>>>> This is stepNextConditionalJob Step3"); return RepeatStatus.FINISHED; }).build(); } }
DecideJob Example
@Slf4j @Configuration @RequiredArgsConstructor public class DecideJobConfig { private final JobBuilderFactory jobBuilderfactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job deciderJob() { return jobBuilderFactory.get("decideJob") .start(startStep()) .next(decider()) .on("ODD") .to(oddStep()) .from(decider()) .on("EVEN") .to(evenStep()) .end() .build(); } @Bean public Step startStep() { return stepBuilderFactory.get("startStep") .tasklet((contribution, chunkContext) -> { log.info(">>>>> Start!"); return RepeatStatus.FINISHED; }).build(); } @Bean public Step evenStep() { return stepBuilderFactory.get("evenStep") .tasklet((contribution, chunkContext) -> { log.info(">>>>> 짝수입니다."); return RepeatStatus.FINISHED; }).build(); } @Bean public Step oddStep() { return stepBuilderFactory.get("oddStep") .tasklet((contirubtion, chunkContext) -> { log.info(">>>>> 홀수입니다."); return RepeatStatus.FINISHED; }).bulid(); } @Bean public JobExcutionDecider decider() { return new OddDecider(); } public static class OddDecider implements JobExecutionDecider { @Override public FlowExcutionStatus decide(JobExcecution jobExecution, StepExecution stepExecution) { Random rand = new Random(); int randomNumber = rand.nextInt(50) + 1; log.info("랜덤숫자: {}", randomNumber); if (randomNumber % 2 == 0) { return new FlowExecutionStatus("EVEN"); } else { return new FlowExecutionStatus("ODD"); } } } }
Chunk-oriented Processing
- Spring Batch에서의 Chunk
- 데이터 덩어리 == 작업할 때 각 커밋 사이에 처리되는 row 수
- Spring Batch는 Chunk 지향 처리
- 데이터를 읽어 처리 & 가공 ~> Chunk 덩어리 생성 ~> Chunk 단위로 트랜잭션을 다룸
- Chunk 단위로 트랜잭션을 수행하기 때문에 실패한 경우에는 해당 Chunk 만큼만 롤백, 이전 커밋 트랜잭션 범위까지 반영
Chunk Oriented Processing Concepts
- 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 회수를 최소화 ~> 성능 향상
- PageSize와 ChunkSize는 의미하는 바가 다름
- Chunk Size
- 한번에 처리될 트랜잭션 단위
- Page Size
- 한번에 조회할 Item 양
- EX) Page Size가 10이고 Chunk Size가 50인 경우
- Page 조회 5번이 일어나면, 1번의 트랜잭션이 발생 ~> Chunk 처리
- Chunk Size
- Spring Batch의 PagingItemReader에는 클래스 상단에 아래와 같은 주석이 존재
- Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
(상당히 큰 페이지 크기를 설정하고 페이지 크기와 일치하는 커밋 간격을 사용하면 성능이 향상됩니다.)ㅅ
- Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
- 성능 이외에도 JPA를 사용할 경우 영속성 컨텍스트가 깨지는 문제 존재
- 특별한 이유가 없다면 PageSize와 ChinkSize를 일치시키는 것을 지향(링크)
ItemRedaer & Processor & Writer
- ItemReader
- Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
- Paging 기반 ItemReader 구현체
- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
- Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- chunk
- <T, T> 제네릭에서 첫번째는 Reader에서 반환할 타입, 두번쨰는 Writer에 파라메터로 넘겨줄 타입
- chunkSize로 인자값을 넣는 경우는 Reader & Writer가 묶일 Chunk 트랜잭션 범위
- fetchSize
- Database에서 한번에 가져올 데이터 양
- Paging은 실제 쿼리를 limit, offset을 이용해서 분할처리하지만, Cursor는 쿼리 분할 처리 없이 실행되거나 내부적으로 가져오는 데이터를 FetchSize만큼 가져와서 read()를 통해 하나씩 가져옴
- dataSource
- Database에 접근하기 위해 사용할 DataSource 객체를 할당
- rowmapper
- 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
- 커스텀하게 생성해서 사용할 수 있지만, 매번 Mapper 클래스를 생성해야해서 보편적으로는 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper.class를 많이 이용
- sql
- Reader로 사용할 쿼리
- name
- reader의 이름 지정
- Bean 이름이 아닌 Spring Batch의 ExecutionContext에 저장될 이름
- CursorItemReader
- Database와 SocketTimeout을 충분히 큰 값으로 설정
- Cursor는 하나의 Connection으로 Batch가 끝날 때까지 사용되기 때문에 Batch가 끝나기 전에 DB와 Application이 먼저 끊어질 수 있음
- Batch 수행 기간이 오래 걸리는 경우에는 PagingItemReader를 사용하는 것이 좋음
- Paging은 한 페이지를 읽을 때마다 Connection을 맺고 끝기 때문에 성능적으로 안정적
- chunk
JdbcCursorItemReader Example
- JdbcPagingItemReader
- Cursor와 사용 방식이 유사
- JdbcCursorItemReader를 사용할 때는 String 타입으로 쿼리를 생성하지만, PagingItemReader는 PagingQueryProvider를 통해 쿼리 생성
- Spring batch는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 확인하고 db에 맞는 Provider를 선택하여 적절한 페이지 전략 사용
JdbcPagingItemReader Example
- ItemWriter
- JdbcBatchItemWriter
- HibernateItemWriter
- JpaItemWrite
- JdbcBatchItemWriter
- columnMapped
- key, value 기반의 Insert SQL의 values를 매핑 (ex : Map<String, Object>)
- beanMapped
- Pojo 기반의 Insert SQL의 values를 매핑
- columnMapped
JdbcBatchItemWriter Example
ItemProcessor
필수 X
package org.springframework.batch.item; public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
I
- ItemReader에서 받을 데이터 타입
O
- ItemWriter에 보낼 데이터 타입
일반적으로 변환이나 필터 처리 용도로 사용
ETC
멱등성
- 연산을 여러번 적용해도 결과가 달라지지 않는 성질
- 멱등성이 깨지는 경우 ~> 제어할 수 없는 코드를 직접 실행할 때
- ex) LocalDate.now() > 이전 데이터를 다시 수행해야할 경우, 해당 코드가 존재하면 문제의 여지 존재
- JobParameter로 입력받도록 처리
- 멱등성이 깨지는 경우 ~> 제어할 수 없는 코드를 직접 실행할 때
- 연산을 여러번 적용해도 결과가 달라지지 않는 성질
무중단 배포
- readlink를 활용
- 배포 젠킨스와 배치 젠킨스를 따로, nDeploy가 배포 젠킨스의 역할로 사용됨
Job 중복실행 방지 에러
A job instance already exists and is complete for parameters={requestDate=20180805}. If you want to run this job again, change the parameters.
- 동일한 Job이 Job Parameter가 달라지면 그때마다 BATCH_JOB_INSTANCE가 생성
- 동일한 Job Parameter는 여러개 존재 가능
- 실패한 Job의 경우, 동일한 Job Parameter라도 동일 실행 가능
- Spring Batch는 동일한 Job Parameter로 성공한 기록이 있을 때만 재수행이 안됨
ConditionalOnProperty
지정된 값으로 구성 등록정보가 있는 경우에만 Bean을 등록하여 사용, 특정 Bean만 활성화 ~> 성능 향상
@Slf4j @Configuration @RequiredArgsContructor @ConditionalOnProperty(name = "spring.batch.job.names", havingValue = "simpleJob") public class SimpleJobConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFacotry stepBuilderFactory; }
ConditionalOnProperty 테스트 수행 시, 테스틏 수행 속도 문제 발생
- 스프링은 전체 테스트 수행시 Environment가 변경될 때마다 Spring Context 재시작
테스트 속도 문제가 존재 ~> ConditionalOnPropert 점검
Reference
반응형 - 데이터에 대한 일괄 처리 작업을 사람 개입 없이 자동으로 지원해주는 Application