Spring Batch - Tasklets vs Chunks

1. Introduktion

Spring Batch giver to forskellige måder at implementere et job på: ved hjælp af tasklets og bidder.

I denne artikel lærer vi at konfigurere og implementere begge metoder ved hjælp af et simpelt eksempel fra virkeligheden.

2. Afhængigheder

Lad os komme i gang med tilføje de krævede afhængigheder:

 org.springframework.batch spring-batch-core 4.2.0.RELEASE org.springframework.batch spring-batch-test 4.2.0.RELEASE test 

For at få den nyeste version af spring-batch-core og spring-batch-test, se Maven Central.

3. Vores brugssag

Lad os overveje en CSV-fil med følgende indhold:

Mae Hodges, 10/22/1972 Gary Potter, 02/22/1953 Betty Wise, 02/17/1968 Wayne Rose, 04/06/1977 Adam Caldwell, 09/27/1995 Lucille Phillips, 05/14/1992

Det første position på hver linje repræsenterer en persons navn, og den anden position repræsenterer hans / hendes fødselsdato.

Vores brugssag er at generere en anden CSV-fil, der indeholder hver persons navn og alder:

Mae Hodges, 45 Gary Potter, 64 Betty Wise, 49 Wayne Rose, 40 Adam Caldwell, 22 Lucille Phillips, 25

Nu hvor vores domæne er klart, lad os gå videre og bygge en løsning ved hjælp af begge tilgange. Vi starter med tasklets.

4. Tasklets tilgang

4.1. Introduktion og design

Tasklets er beregnet til at udføre en enkelt opgave inden for et trin. Vores job vil bestå af flere trin, der udføres efter hinanden. Hvert trin skal kun udføre en defineret opgave.

Vores job består af tre trin:

  1. Læs linjer fra input-CSV-filen.
  2. Beregn alder for hver person i input-CSV-filen.
  3. Skriv navn og alder på hver person til en ny output-CSV-fil.

Nu hvor det store billede er klar, lad os oprette en klasse pr. Trin.

LinesLæser har ansvaret for læsning af data fra inputfilen:

public class LinesReader implementerer Tasklet {// ...}

LinesProcessor beregner alderen for hver person i filen:

offentlig klasse LinesProcessor implementerer Tasklet {// ...}

Langt om længe, LinesWriter har ansvaret for at skrive navne og aldre til en outputfil:

offentlig klasse LinesWriter implementerer Tasklet {// ...}

På dette tidspunkt, alle vores trin implementeres Opgave interface. Det vil tvinge os til at gennemføre dets udføre metode:

@Override public RepeatStatus execute (StepContribution stepContribution, ChunkContext chunkContext) kaster undtagelse {// ...}

Denne metode er, hvor vi tilføjer logikken for hvert trin. Før vi starter med denne kode, lad os konfigurere vores job.

4.2. Konfiguration

Vi er nødt til tilføj nogle konfigurationer til Spring's applikationskontekst. Efter at have tilføjet standard bønneerklæring for de klasser, der blev oprettet i det foregående afsnit, er vi klar til at oprette vores jobdefinition:

@Configuration @EnableBatchProcessing public class TaskletsConfig {@Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory trin; @Bean beskyttet Step readLines () {return step .get ("readLines") .tasklet (linesReader ()) .build (); } @Bean-beskyttet trin processLines () {return trin .get ("processLines"). Opgave (linesProcessor ()) .build (); } @Bean-beskyttet trin writeLines () {return trin .get ("writeLines") .tasklet (linesWriter ()) .build (); } @Bean offentligt jobjob () {returnerer job .get ("taskletsJob") .start (readLines ()) .next (processLines ()) .next (writeLines ()) .build (); } // ...}

Dette betyder, at vores “TaskletsJob” vil bestå af tre trin. Den første (readLines) udfører den tasklet, der er defineret i bønnen linjerLæser og gå til næste trin: processLines. ProcessLines udfører den tasklet, der er defineret i bønnen linjerProcessor og gå til det sidste trin: writeLines.

Vores jobflow er defineret, og vi er klar til at tilføje nogle logik!

4.3. Model og hjælpeprogrammer

Da vi manipulerer linjer i en CSV-fil, opretter vi en klasse Linie:

offentlig klasse Line implementerer Serializable {private String name; privat LocalDate dob; privat Lang alder; // standard konstruktør, getters, setters og implementering af toString}

Bemærk, at Linie redskaber Serialiserbar. Det er fordi Linie fungerer som en DTO til at overføre data mellem trin. Ifølge Spring Batch, objekter, der overføres mellem trin, skal kan serienummereres.

På den anden side kan vi begynde at tænke på at læse og skrive linjer.

Til det gør vi brug af OpenCSV:

 com.opencsv opencsv 4.1 

Se efter den nyeste OpenCSV-version i Maven Central.

Når OpenCSV er inkluderet, vi opretter også en FileUtils klasse. Det giver metoder til læsning og skrivning af CSV-linjer:

offentlig klasse FileUtils {public Line readLine () kaster undtagelse {hvis (CSVReader == null) initReader (); Streng [] linje = CSVReader.readNext (); hvis (line == null) returnerer null; returner ny linje (linje [0], LocalDate.parse (linje [1], DateTimeFormatter.ofPattern ("MM / dd / åååå")); } public void writeLine (Line line) kaster Undtagelse {if (CSVWriter == null) initWriter (); Streng [] lineStr = ny streng [2]; lineStr [0] = line.getName (); lineStr [1] = linje .getAge () .toString (); CSVWriter.writeNext (lineStr); } // ...}

Læg mærke til det readLine fungerer som en indpakning over OpenCSV'er readNext metode og returnerer a Linie objekt.

Samme måde, writeLine indpakker OpenCSV'er skrivNæste modtager en Linie objekt. Fuld implementering af denne klasse kan findes i GitHub-projektet.

På dette tidspunkt er vi klar til at starte med hvert trinimplementering.

4.4. LinesReader

Lad os gå videre og afslutte vores LinesReader klasse:

offentlig klasse LinesReader implementerer Tasklet, StepExecutionListener {private final Logger logger = LoggerFactory .getLogger (LinesReader.class); private Liste linjer; private FileUtils fu; @Override offentligt ugyldigt beforeStep (StepExecution stepExecution) {lines = new ArrayList (); fu = nye FileUtils ("taskletsvschunks / input / tasklets-vs-chunks.csv"); logger.debug ("Linjelæser initialiseret."); } @ Override public RepeatStatus execute (StepContribution stepContribution, ChunkContext chunkContext) kaster Undtagelse {Line line = fu.readLine (); while (line! = null) {lines.add (line); logger.debug ("Læs linje:" + line.toString ()); linje = fu.readLine (); } returner RepeatStatus.FINISHED; } @ Override offentlig ExitStatus afterStep (StepExecution stepExecution) {fu.closeReader (); stepExecution .getJobExecution () .getExecutionContext () .put ("linjer", this.lines); logger.debug ("Linjelæser sluttede."); returner ExitStatus.COMPLETED; }}

LinesReader udfører metode skaber en FileUtils forekomst over inputfilstien. Derefter, tilføjer linjer til en liste, indtil der ikke er flere linjer at læse.

Vores klasse også redskaber StepExecutionListener der giver to ekstra metoder: før trin og efter trin. Vi bruger disse metoder til at initialisere og lukke ting før og efter udføre kører.

Hvis vi kigger på efter trin kode, vil vi bemærke den linje, hvor resultatlisten (linjer) sættes i jobkonteksten for at gøre det tilgængeligt til næste trin:

stepExecution .getJobExecution () .getExecutionContext () .put ("linjer", this.lines);

På dette tidspunkt har vores første skridt allerede opfyldt sit ansvar: indlæse CSV-linjer i en Liste i hukommelsen. Lad os gå til andet trin og behandle dem.

4.5. LinesProcessor

LinesProcessor vil også implementere StepExecutionListener og selvfølgelig, Opgave. Det betyder, at det vil implementere før trin, udføre og efter trin metoder også:

offentlig klasse LinesProcessor implementerer Tasklet, StepExecutionListener {private Logger logger = LoggerFactory.getLogger (LinesProcessor.class); private Liste linjer; @Override offentligt tomrum førStep (StepExecution stepExecution) {ExecutionContext executionContext = stepExecution .getJobExecution () .getExecutionContext (); this.lines = (List) executionContext.get ("linjer"); logger.debug ("Linjeprocessor initialiseret."); } @ Override public RepeatStatus execute (StepContribution stepContribution, ChunkContext chunkContext) kaster Undtagelse {for (Line line: lines) {long age = ChronoUnit.YEARS.between (line.getDob (), LocalDate.now ()); logger.debug ("Beregnet alder" + alder + "for linje" + line.toString ()); line.setAge (alder); } returner RepeatStatus.FINISHED; } @ Override offentlig ExitStatus efter trin (StepExecution stepExecution) {logger.debug ("Linjeprocessor afsluttet."); returner ExitStatus.COMPLETED; }}

Det er ubesværet at forstå det det indlæses linjer liste fra jobkonteksten og beregner hver persons alder.

Det er ikke nødvendigt at sætte en anden resultatliste i sammenhængen, da ændringer sker på det samme objekt, der kommer fra det foregående trin.

Og vi er klar til vores sidste skridt.

4.6. LinesWriter

LinesWriter'S opgave er at gå over linjer liste og skriv navn og alder til outputfilen:

offentlig klasse LinesWriter implementerer Tasklet, StepExecutionListener {private final Logger logger = LoggerFactory .getLogger (LinesWriter.class); private Liste linjer; private FileUtils fu; @Override offentligt tomrum førStep (StepExecution stepExecution) {ExecutionContext executionContext = stepExecution .getJobExecution () .getExecutionContext (); this.lines = (List) executionContext.get ("linjer"); fu = nye FileUtils ("output.csv"); logger.debug ("Lines Writer initialised."); } @ Override public RepeatStatus execute (StepContribution stepContribution, ChunkContext chunkContext) kaster Undtagelse {for (Line line: lines) {fu.writeLine (line); logger.debug ("Skrev linje" + line.toString ()); } returner RepeatStatus.FINISHED; } @ Override offentlig ExitStatus efter trin (StepExecution stepExecution) {fu.closeWriter (); logger.debug ("Lines Writer sluttede."); returner ExitStatus.COMPLETED; }}

Vi er færdige med vores jobs implementering! Lad os oprette en test for at køre den og se resultaterne.

4.7. Kører jobbet

For at køre jobbet opretter vi en test:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = TaskletsConfig.class) public class TaskletsTest {@Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test offentlig ugyldighed givenTaskletsJob_whenJobEnds_thenStatusCompleted () kaster undtagelse {JobExecution jobExecution = jobLauncherTestUtils.launchJob (); assertEquals (ExitStatus.COMPLETED, jobExecution.getExitStatus ()); }}

Kontekstkonfiguration annotation peger på Spring-kontekstkonfigurationsklassen, der har vores jobdefinition.

Vi bliver nødt til at tilføje et par ekstra bønner, før vi kører testen:

@Bean offentlige JobLauncherTestUtils jobLauncherTestUtils () {returner nye JobLauncherTestUtils (); } @Bean offentlig JobRepository jobRepository () kaster undtagelse {MapJobRepositoryFactoryBean fabrik = ny MapJobRepositoryFactoryBean (); factory.setTransactionManager (transactionManager ()); returner (JobRepository) fabrik.getObject (); } @Bean offentlig PlatformTransactionManager transactionManager () {returner ny ResourcelessTransactionManager (); } @Bean offentlig JobLauncher jobLauncher () kaster undtagelse {SimpleJobLauncher jobLauncher = ny SimpleJobLauncher (); jobLauncher.setJobRepository (jobRepository ()); return jobLauncher; }

Alting er klar! Fortsæt og kør testen!

Når jobbet er afsluttet, output.csv har det forventede indhold, og logfiler viser eksekveringsflowet:

[main] DEBUG o.b.t.tasklets.LinesReader - Linjelæser initialiseret. [main] DEBUG obttasklets.LinesReader - Læs linje: [Mae Hodges, 10/22/1972] [main] DEBUG obttasklets.LinesReader - Læs line: [Gary Potter, 02/22/1953] [main] DEBUG obttasklets .LinesReader - Læs linje: [Betty Wise, 02/17/1968] [main] DEBUG obttasklets.LinesReader - Read line: [Wayne Rose, 04/06/1977] [main] DEBUG obttasklets.LinesReader - Læs line: [Adam Caldwell, 09/27/1995] [main] DEBUG obttasklets.LinesReader - Læs linje: [Lucille Phillips, 05/14/1992] [main] DEBUG obttasklets.LinesReader - Lines Reader sluttede. [main] DEBUG o.b.t.tasklets.LinesProcessor - Linieprocessor initialiseret. [hoved] DEBUG obttasklets.LinesProcessor - Beregnet alder 45 for linje [Mae Hodges, 10/22/1972] [main] DEBUG obttasklets.LinesProcessor - Beregnet alder 64 for linje [Gary Potter, 02/22/1953] [main ] DEBUG obttasklets.LinesProcessor - Beregnet alder 49 for linje [Betty Wise, 02/17/1968] [main] DEBUG obttasklets.LinesProcessor - Beregnet alder 40 for line [Wayne Rose, 04/06/1977] [main] DEBUG obttasklets.LinesProcessor - Beregnet alder 22 for linje [Adam Caldwell, 09/27/1995] [main] DEBUG obttasklets.LinesProcessor - Beregnet alder 25 for line [Lucille Phillips, 05/14/1992] [main] DEBUG obttasklets .LinesProcessor - Linjeprocessor afsluttet. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialiseret. [main] DEBUG obttasklets.LinesWriter - Skrev linje [Mae Hodges, 10/22 / 1972,45] [main] DEBUG obttasklets.LinesWriter - Wrote line [Gary Potter, 02/22 / 1953,64] [main] DEBUG obttasklets.LinesWriter - Skrev linje [Betty Wise, 02/17 / 1968,49] [main] DEBUG obttasklets.LinesWriter - Wrote line [Wayne Rose, 04/06 / 1977,40] [main] DEBUG obttasklets.LinesWriter - Skrev linje [Adam Caldwell, 09/27 / 1995,22] [main] DEBUG obttasklets.LinesWriter - Wrote line [Lucille Phillips, 05/14 / 1992,25] [main] DEBUG obttasklets.LinesWriter - Lines Writer sluttede .

Det er det for Tasklets. Nu kan vi gå videre til Chunks-tilgangen.

5. Chunks Approach

5.1. Introduktion og design

Som navnet antyder, denne tilgang udfører handlinger over klumper af data. Det vil sige, i stedet for at læse, behandle og skrive alle linjerne på én gang, vil det læse, behandle og skrive en fast mængde poster (klump) ad gangen.

Derefter gentager det cyklussen, indtil der ikke er flere data i filen.

Som et resultat vil strømmen være lidt anderledes:

  1. Mens der er linjer:
    • Gør for X-antal linjer:
      • Læs en linje
      • Behandl en linje
    • Skriv X antal linjer.

Så vi skal også skabe tre bønner til klumporienteret tilgang:

offentlig klasse LineReader {// ...}
offentlig klasse LineProcessor {// ...}
offentlig klasse LinesWriter {// ...}

Lad os konfigurere vores job inden implementering.

5.2. Konfiguration

Jobdefinitionen vil også se anderledes ud:

@Configuration @EnableBatchProcessing public class ChunksConfig {@Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory trin; @Bean public ItemReader itemReader () {returner ny LineReader (); } @Bean public ItemProcessor itemProcessor () {returner ny LineProcessor (); } @Bean public ItemWriter itemWriter () {returner ny LinesWriter (); } @Bean-beskyttet trin processLines (ItemReader-læser, ItemProcessor-processor, ItemWriter-forfatter) {return steps.get ("processLines"). klump (2) .læser (læser) .processor (processor) .forfatter (forfatter) .build (); } @Bean offentligt jobjob () {returnerer job .get ("chunksJob") .start (processLines (itemReader (), itemProcessor (), itemWriter ())) .build (); }}

I dette tilfælde er der kun et trin, der kun udfører en tasklet.

Imidlertid denne tasklet definerer en læser, en forfatter og en processor, der vil handle over klumper af data.

Bemærk, at Forpligtelsesinterval angiver mængden af ​​data, der skal behandles i et stykke. Vores job vil læse, behandle og skrive to linjer ad gangen.

Nu er vi klar til at tilføje vores klumplogik!

5.3. LineReader

LineReader har ansvaret for at læse en post og returnere en Linie eksempel med dets indhold.

For at blive læser, vores klasse skal implementere ItemReader interface:

offentlig klasse LineReader implementerer ItemReader {@Override public Line read () kaster Undtagelse {Line line = fu.readLine (); hvis (line! = null) logger.debug ("Læs linje:" + line.toString ()); returlinje; }}

Koden er ligetil, den læser bare en linje og returnerer den. Vi implementerer også StepExecutionListener til den endelige version af denne klasse:

offentlig klasse LineReader implementerer ItemReader, StepExecutionListener {private final Logger logger = LoggerFactory .getLogger (LineReader.class); private FileUtils fu; @ Overstyr offentlig tomrum førStep (StepExecution stepExecution) {fu = nye FileUtils ("taskletsvschunks / input / tasklets-vs-chunks.csv"); logger.debug ("Line Reader initialized."); } @ Override public Line read () kaster undtagelse {Line line = fu.readLine (); hvis (line! = null) logger.debug ("Læs linje:" + line.toString ()); returlinje; } @ Override offentlig ExitStatus afterStep (StepExecution stepExecution) {fu.closeReader (); logger.debug ("Line Reader sluttede."); returner ExitStatus.COMPLETED; }}

Det skal bemærkes, at før trin og efter trin udføre henholdsvis før og efter hele trin.

5.4. LineProcessor

LineProcessor følger stort set den samme logik end LineReader.

I dette tilfælde vi implementerer ItemProcessor og dens metode behandle():

offentlig klasse LineProcessor implementerer ItemProcessor {private Logger logger = LoggerFactory.getLogger (LineProcessor.class); @Override public Line-proces (Line line) kaster undtagelse {long age = ChronoUnit.YEARS .between (line.getDob (), LocalDate.now ()); logger.debug ("Beregnet alder" + alder + "for linje" + line.toString ()); line.setAge (alder); returlinje; }}

Det behandle() metode tager en inputlinje, behandler den og returnerer en outputline. Igen implementerer vi også StepExecutionListener:

offentlig klasse LineProcessor implementerer ItemProcessor, StepExecutionListener {private Logger logger = LoggerFactory.getLogger (LineProcessor.class); @ Overstyr offentlig ugyldighed førStep (StepExecution stepExecution) {logger.debug ("Line Processor initialised."); } @Override public Line-proces (Line line) kaster undtagelse {long age = ChronoUnit.YEARS .between (line.getDob (), LocalDate.now ()); logger.debug ("Beregnet alder" + alder + "for linje" + line.toString ()); line.setAge (alder); returlinje; } @ Override offentlig ExitStatus efter trin (StepExecution stepExecution) {logger.debug ("Line Processor sluttede."); returner ExitStatus.COMPLETED; }}

5.5. LinesWriter

I modsætning til læser og processor, LinesWriter vil skrive et helt stykke linier så det modtager en Liste af Linjer:

offentlig klasse LinesWriter implementerer ItemWriter, StepExecutionListener {private final Logger logger = LoggerFactory .getLogger (LinesWriter.class); private FileUtils fu; @ Overstyr offentlig tomrum før trin (StepExecution stepExecution) {fu = nye FileUtils ("output.csv"); logger.debug ("Line Writer initialiseret."); } @ Overstyr offentlig ugyldig skrivning (listelinjer) kaster undtagelse {for (linjelinje: linjer) {fu.writeLine (linje); logger.debug ("Skrev linje" + line.toString ()); }} @ Override offentlig ExitStatus afterStep (StepExecution stepExecution) {fu.closeWriter (); logger.debug ("Line Writer sluttede."); returner ExitStatus.COMPLETED; }}

LinesWriter kode taler for sig selv. Og igen er vi klar til at teste vores job.

5.6. Kører jobbet

Vi opretter en ny test, den samme som den, vi oprettede til tasklets-tilgangen:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = ChunksConfig.class) public class ChunksTest {@Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test offentlig ugyldighed givenChunksJob_whenJobEnds_thenStatusCompleted () kaster undtagelse {JobExecution jobExecution = jobLauncherTestUtils.launchJob (); assertEquals (ExitStatus.COMPLETED, jobExecution.getExitStatus ()); }}

Efter konfiguration ChunksConfig som forklaret ovenfor for TaskletsConfig, vi er klar til at køre testen!

Når jobbet er udført, kan vi se det output.csv indeholder det forventede resultat igen, og logfilerne beskriver flowet:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialiseret. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialiseret. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialiseret. [main] DEBUG obtchunks.LineReader - Læs linje: [Mae Hodges, 10/22/1972] [main] DEBUG obtchunks.LineReader - Læs line: [Gary Potter, 02/22/1953] [main] DEBUG obtchunks .LineProcessor - Beregnet alder 45 for linje [Mae Hodges, 10/22/1972] [main] DEBUG obtchunks.LineProcessor - Beregnet alder 64 for line [Gary Potter, 02/22/1953] [main] DEBUG obtchunks.LinesWriter - Skrev linje [Mae Hodges, 10/22 / 1972,45] [main] DEBUG obtchunks.LinesWriter - Wrote line [Gary Potter, 02/22 / 1953,64] [main] DEBUG obtchunks.LineReader - Læs linje: [Betty Wise, 02/17/1968] [main] DEBUG obtchunks.LineReader - Læs linje: [Wayne Rose, 04/06/1977] [main] DEBUG obtchunks.LineProcessor - Beregnet alder 49 for line [Betty Wise, 02/17/1968] [main] DEBUG obtchunks.LineProcessor - Beregnet alder 40 for linje [Wayne Rose, 04/06/1977] [main] DEBUG obtchunks.LinesWriter - Skrev linje [Betty Wise, 02/17/1968 , 49] [main] DEBUG obtchunks.LinesWriter - Skrev linje [Wayne Rose, 04/06 / 1977,40] [main] DEBUG ob t.chunks.LineReader - Læs linje: [Adam Caldwell, 09/27/1995] [main] DEBUG obtchunks.LineReader - Read line: [Lucille Phillips, 05/14/1992] [main] DEBUG obtchunks.LineProcessor - Beregnet alder 22 for linje [Adam Caldwell, 09/27/1995] [main] DEBUG obtchunks.LineProcessor - Beregnet alder 25 for line [Lucille Phillips, 05/14/1992] [main] DEBUG obtchunks.LinesWriter - Wrote line [Adam Caldwell, 09/27 / 1995,22] [main] DEBUG obtchunks.LinesWriter - Skrev linje [Lucille Phillips, 05/14 / 1992,25] [main] DEBUG obtchunks.LineProcessor - Line Processor sluttede. [main] DEBUG o.b.t. chunks.LinesWriter - Line Writer sluttede. [main] DEBUG o.b.t. chunks.LineReader - Line Reader sluttede.

Vi har det samme resultat og en anden strøm. Logfiler viser, hvordan jobbet udføres efter denne tilgang.

6. Konklusion

Forskellige sammenhænge viser behovet for den ene eller den anden tilgang. Mens Tasklets føles mere naturlige for 'den ene opgave efter den anden' scenarier, giver bidder en enkel løsning til at håndtere paginerede læsninger eller situationer, hvor vi ikke vil beholde en betydelig mængde data i hukommelsen.

Den komplette implementering af dette eksempel kan findes i GitHub-projektet.