ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Batch 기초 정리
    Java & Spring/Spring Batch 2021. 6. 22. 17:44
    반응형

    Spring Batch

    Batch Application

    • 데이터에 대한 일괄 처리 작업을 사람 개입 없이 자동으로 지원해주는 Application
      • Batch : 일괄처리
      • Scheduler : 일정 시간마다 실행시키는 역할
      • Quartz? Spring Batch?
        • Quartz : Job Scheduling Library
          • Quartz는 스케쥴러의 역할이지 Batch와 같이 대용량 데이터 배치 처리에 대한 기능 지원 X
          • Quartz외에도 Spring Scheduler, Jenkins, Cron 등이 존재
        • Spring Batch : Bach Framework
          • Spring Batch는 스케쥴링 프레임워크가 아님
          • Batch는 Quartz의 스케쥴링 기능 지원 X
        • 보통 Quartz + Spring Batch로 사용하면서 데이터 일괄처리를 자동으로 진행
          • 요즘에는 Jenkins + Spring Batch로 사용
      • 간단한 일정 주기 반복성 작업 ~> Spring Batch를 사용하지 않고 Scheduler만 사용해서 처리하는 것도 좋음
      • Batch Application VS Web Application
        • Web : 실시간 처리 / 상대적인 속도 / QA 용이
        • Batch: 후속 처리 / 절대적인 속도 / QA 복잡성 > 테스트코드 중요

    The Domain Language of Batch

    • Batch Stereotypes
    • Domain Language of Batch
    • Job
      • 배치 처리과정을 하나의 단위로 만들어 표현한 객체
      • 하나의 Job에는 1 : N으로 Step을 가짐
      • Step 인스턴스의 컨테이너 역할
    • JobInstance
      • Job이 실제 실행되는 단위
      • JobInstance는 여러개의 JobExecution을 가질 수 있음
    • JobParameters
      • 배치 작업을 시작하는데 사용되는 파라미터를 보유
      • 식별자로 사용되거나 실행 중 참조될 수 있음
    • JobExecution
      • JobInstance에 대한 한 번의 실행을 나타내는 객체
      • JobExcution은 JobInstance, 배치 실행 상태, 시작 시간, 끝난 시간, 실패했을 때 메시지 등의 정보를 담고 있음
      • 실행이 성공적으로 완료되지 않는 한 지정된 실행에 해당하는 JobInstance는 완료 간주X
    • Step
      • 실질적인 배치 처리(모든 정보를 담고 있음)
      • 모든 Job에는 1개 이상의 Step이 존재해야함
      • Step 안에 Tasklet 혹은 Reader & Processor & Writer 묶음이 존재
      • 배치 Job의 독립적이고 순차적인 단계를 캡슐화하는 도메인 객체
    • StepExecution
      • JobExecution과 유사하며, Step의 실제 정보를 담는 객체
      • 이전 step이 실패하면 다음 step 진행을 하지 않음
    • JobRepository
      • 위의 모든 절차에 대한 지속적인 매커니즘
      • JobRepository에서 JobExecution을 관리
        • 첫 Job이 실행되면 JobRepository로 부터 JobExecution을 가져와서 실행하는 동안 step 및 job execution을 JobRepository로 전달하며 유지
      • JobLauncher, Job, Step에 대한 CRUD를 제공
    • JobLauncher
      • JobParameters와 함께 실행되는 Job에 대한 간단한 인터페이스
    • Tasklet
      • Step 안에서 단일로 수행될 커스텀한 기능들을 선언할 때 사용
    • ItemReader
      • step에서 배치 데이터를 읽어오는 역할
    • ItemWriter
      • step의 결과인 배치 데이터를 저장하는 역할
        • 일반적으로 DB나 파일에 저장
    • ItemProcessor
      • 배치 데이터를 가공 & 변환하는 역할 (필수 X)
        • 비즈니스 로직의 분리 ~> ItemWriter는 저장 수행, ItemProcessor는 로직 처리만 수행
    • Tasklet || ItemReader + ItemWriter + ItemProcessor로 사용

    Spring Batch Job Flow

    • next()

      • 순차적으로 Step을 연결
    • .on()

      • 캐치할 ExitStatus 지정
      • * 일 경우 모든 ExitStatus가 지정됨
    • to()

      • 다음으로 이동할 Step 지정
    • from()

      • 일종의 이벤트 리스너 역할
      • 상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출
      • Step의 이벤트 캐치가 FAILED로 되어있는 상태에서 추가로 이벤트를 캐치하려면 from을 써야만 함
    • decide

      • 지정된 ExitStatus외의 분기를 컨트롤할 경우, ExitStatus를 커스텀해야하기 때문에 번거로움
      • Step들의 Flow 속에서 분기만 담당하는 타입
    • StepJob 예시

      • @Slf4j
        @Configuration
        @RequiredArgsConstructor
        public class StepJobConfig {
            private final JobBuilderFactory jobBuilderFactory;
            private final StepBuilderFactory stepBuilderFactory;
        
            @Bean
            public Job stepNextJob() {
                return jobBuilderFactory.get("stepJob")
                    .start(step1())
                    .next(step2())
                    .next(conditionalJobStep1())
                        .on("FAILED") // ExitStatus가 FAILED인 경우
                        .to(conditionalJobStep3())
                        .on("*")
                        .end()
                    .from(conditionalJobStep1())
                        .on("*") // ExitStatus가 FAILED가 아닌 경우
                        .to(conditionalJobStep2())
                        .next(conditionalJobStep3())
                        .end()
                    .end()
                    .build();
            }
        
            @Bean
            public Step step1() {
                return stepBuilderFactory.get("step1")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> This is Step1");
                    }).build();
            }
        
            @Bean
            public Step step2() {
                return stepBuilderFacotry.get("step2")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> This is Step2");
                    }).build();
            }
        
            @Bean
            public Step conditionalJobStep1() {
                return stepBuilderFactory.get("step1")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> This is stepNextConditionalJob Step1");
        
                        /**
                            ExitStatus를 FAILED로 지정한다.
                            해당 status를 보고 flow가 진행된다.
                            **/
                        contribution.setExitStatus(ExitStatus.FAILED);
        
                        return RepeatStatus.FINISHED;
                    }).build();
            }
        
            @Bean
            public Step conditionalJobStep2() {
                return stepBuilderFactory.get("conditionalJobStep2")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> This is stepNextConditionalJob Step2");
                        return RepeatStatus.FINISHED;
                    }).build();
            }
        
            @Bean
            public Step conditionalJobStep3() {
                return stepBuilderFactory.get("conditionalJobStep3")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> This is stepNextConditionalJob Step3");
                        return RepeatStatus.FINISHED;
                    }).build();
            }
        }
    • DecideJob Example

      • @Slf4j
        @Configuration
        @RequiredArgsConstructor
        public class DecideJobConfig {
            private final JobBuilderFactory jobBuilderfactory;
            private final StepBuilderFactory stepBuilderFactory;
        
            @Bean
            public Job deciderJob() {
                return jobBuilderFactory.get("decideJob")
                    .start(startStep())
                    .next(decider())
                        .on("ODD")
                        .to(oddStep())
                    .from(decider())
                        .on("EVEN")
                        .to(evenStep())
                    .end()
                    .build();
            }
        
            @Bean
            public Step startStep() {
                return stepBuilderFactory.get("startStep")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> Start!");
                        return RepeatStatus.FINISHED;
                    }).build();
            }
        
            @Bean
            public Step evenStep() {
                return stepBuilderFactory.get("evenStep")
                    .tasklet((contribution, chunkContext) -> {
                        log.info(">>>>> 짝수입니다.");
                        return RepeatStatus.FINISHED;
                    }).build();
            }
        
            @Bean
            public Step oddStep() {
                return stepBuilderFactory.get("oddStep")
                    .tasklet((contirubtion, chunkContext) -> {
                        log.info(">>>>> 홀수입니다.");
                        return RepeatStatus.FINISHED;
                    }).bulid();
            }
        
            @Bean
            public JobExcutionDecider decider() {
                return new OddDecider();
            }
        
            public static class OddDecider implements JobExecutionDecider {
                @Override
                public FlowExcutionStatus decide(JobExcecution jobExecution, StepExecution stepExecution) {
                    Random rand = new Random();
                    int randomNumber = rand.nextInt(50) + 1;
                    log.info("랜덤숫자: {}", randomNumber);
                    if (randomNumber % 2 == 0) {
                        return new FlowExecutionStatus("EVEN");
                    } else {
                        return new FlowExecutionStatus("ODD");
                    }
                }
            }
        }

    Chunk-oriented Processing

    Chunk Oriented Processing

    • Spring Batch에서의 Chunk
      • 데이터 덩어리 == 작업할 때 각 커밋 사이에 처리되는 row 수
    • Spring Batch는 Chunk 지향 처리
      • 데이터를 읽어 처리 & 가공 ~> Chunk 덩어리 생성 ~> Chunk 단위로 트랜잭션을 다룸
      • Chunk 단위로 트랜잭션을 수행하기 때문에 실패한 경우에는 해당 Chunk 만큼만 롤백, 이전 커밋 트랜잭션 범위까지 반영

    Chunk Oriented Processing Concepts

    • 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 회수를 최소화 ~> 성능 향상
    • PageSize와 ChunkSize는 의미하는 바가 다름
      • Chunk Size
        • 한번에 처리될 트랜잭션 단위
      • Page Size
        • 한번에 조회할 Item 양
      • EX) Page Size가 10이고 Chunk Size가 50인 경우
        • Page 조회 5번이 일어나면, 1번의 트랜잭션이 발생 ~> Chunk 처리
    • Spring Batch의 PagingItemReader에는 클래스 상단에 아래와 같은 주석이 존재
      • Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
        (상당히 큰 페이지 크기를 설정하고 페이지 크기와 일치하는 커밋 간격을 사용하면 성능이 향상됩니다.)ㅅ
    • 성능 이외에도 JPA를 사용할 경우 영속성 컨텍스트가 깨지는 문제 존재
      • 특별한 이유가 없다면 PageSize와 ChinkSize를 일치시키는 것을 지향(링크)

    ItemRedaer & Processor & Writer

    • ItemReader
      • Cursor 기반 ItemReader 구현체
        • JdbcCursorItemReader
        • HibernateCursorItemReader
        • StoredProcedureItemReader
      • Paging 기반 ItemReader 구현체
        • JdbcPagingItemReader
        • HibernatePagingItemReader
        • JpaPagingItemReader
    • JdbcCursorItemReader
      • chunk
        • <T, T> 제네릭에서 첫번째는 Reader에서 반환할 타입, 두번쨰는 Writer에 파라메터로 넘겨줄 타입
        • chunkSize로 인자값을 넣는 경우는 Reader & Writer가 묶일 Chunk 트랜잭션 범위
      • fetchSize
        • Database에서 한번에 가져올 데이터 양
        • Paging은 실제 쿼리를 limit, offset을 이용해서 분할처리하지만, Cursor는 쿼리 분할 처리 없이 실행되거나 내부적으로 가져오는 데이터를 FetchSize만큼 가져와서 read()를 통해 하나씩 가져옴
      • dataSource
        • Database에 접근하기 위해 사용할 DataSource 객체를 할당
      • rowmapper
        • 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
        • 커스텀하게 생성해서 사용할 수 있지만, 매번 Mapper 클래스를 생성해야해서 보편적으로는 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper.class를 많이 이용
      • sql
        • Reader로 사용할 쿼리
      • name
        • reader의 이름 지정
        • Bean 이름이 아닌 Spring Batch의 ExecutionContext에 저장될 이름
      • CursorItemReader
        • Database와 SocketTimeout을 충분히 큰 값으로 설정
        • Cursor는 하나의 Connection으로 Batch가 끝날 때까지 사용되기 때문에 Batch가 끝나기 전에 DB와 Application이 먼저 끊어질 수 있음
          • Batch 수행 기간이 오래 걸리는 경우에는 PagingItemReader를 사용하는 것이 좋음
          • Paging은 한 페이지를 읽을 때마다 Connection을 맺고 끝기 때문에 성능적으로 안정적

    JdbcCursorItemReader Example

    • JdbcPagingItemReader
      • Cursor와 사용 방식이 유사
      • JdbcCursorItemReader를 사용할 때는 String 타입으로 쿼리를 생성하지만, PagingItemReader는 PagingQueryProvider를 통해 쿼리 생성
      • Spring batch는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 확인하고 db에 맞는 Provider를 선택하여 적절한 페이지 전략 사용

    JdbcPagingItemReader Example

    • ItemWriter
      • JdbcBatchItemWriter
      • HibernateItemWriter
      • JpaItemWrite
    • JdbcBatchItemWriter
      • columnMapped
        • key, value 기반의 Insert SQL의 values를 매핑 (ex : Map<String, Object>)
      • beanMapped
        • Pojo 기반의 Insert SQL의 values를 매핑

    JdbcBatchItemWriter Example

    • ItemProcessor

      • 필수 X

      • package org.springframework.batch.item;
        
        public interface ItemProcessor<I, O> {
            O process(I item) throws Exception;
        }
      • I

        • ItemReader에서 받을 데이터 타입
      • O

        • ItemWriter에 보낼 데이터 타입
      • 일반적으로 변환이나 필터 처리 용도로 사용


    ETC

    • 멱등성

      • 연산을 여러번 적용해도 결과가 달라지지 않는 성질
        • 멱등성이 깨지는 경우 ~> 제어할 수 없는 코드를 직접 실행할 때
          • ex) LocalDate.now() > 이전 데이터를 다시 수행해야할 경우, 해당 코드가 존재하면 문제의 여지 존재
          • JobParameter로 입력받도록 처리
    • 무중단 배포

      • readlink를 활용
      • 배포 젠킨스와 배치 젠킨스를 따로, nDeploy가 배포 젠킨스의 역할로 사용됨
    • Job 중복실행 방지 에러

      • A job instance already exists and is complete for parameters={requestDate=20180805}. If you want to run this job again, change the parameters.
      • 동일한 Job이 Job Parameter가 달라지면 그때마다 BATCH_JOB_INSTANCE가 생성
      • 동일한 Job Parameter는 여러개 존재 가능
      • 실패한 Job의 경우, 동일한 Job Parameter라도 동일 실행 가능
      • Spring Batch는 동일한 Job Parameter로 성공한 기록이 있을 때만 재수행이 안됨
    • ConditionalOnProperty

      • 지정된 값으로 구성 등록정보가 있는 경우에만 Bean을 등록하여 사용, 특정 Bean만 활성화 ~> 성능 향상

      • @Slf4j
        @Configuration
        @RequiredArgsContructor
        @ConditionalOnProperty(name = "spring.batch.job.names", havingValue = "simpleJob")
        public class SimpleJobConfig {
            private final JobBuilderFactory jobBuilderFactory;
            private final StepBuilderFacotry stepBuilderFactory;
        }
      • ConditionalOnProperty 테스트 수행 시, 테스틏 수행 속도 문제 발생

        • 스프링은 전체 테스트 수행시 Environment가 변경될 때마다 Spring Context 재시작
      • 테스트 속도 문제가 존재 ~> ConditionalOnPropert 점검

    Reference

    반응형

    댓글

Designed by Tistory.