Batchbehandling af Java EE 7

1. Introduktion

Forestil dig, at vi skulle udføre opgaver manuelt som at behandle lønsedler, beregne renter og generere regninger. Det ville blive ret kedeligt, udsat for fejl og en uendelig liste over manuelle opgaver!

I denne tutorial tager vi et kig på Java Batch Processing (JSR 352), en del af Jakarta EE-platformen og en god specifikation til automatisering af opgaver som disse. Det giver applikationsudviklere en model til udvikling af robuste batchbehandlingssystemer, så de kan fokusere på forretningslogikken.

2. Maven-afhængigheder

Da JSR 352 bare er en spec, skal vi medtage dens API og implementering, ligesom jberet:

 javax.batch javax.batch-api 1.0.1 org.jberet jberet-core 1.0.2.Final org.jberet jberet-support 1.0.2.Final org.jberet jberet-se 1.0.2.Final 

Vi tilføjer også en database i hukommelsen, så vi kan se på nogle mere realistiske scenarier.

3. Nøglebegreber

JSR 352 introducerer et par begreber, som vi kan se på denne måde:

Lad os først definere hvert stykke:

  • Fra venstre har vi JobOperator. Det administrerer alle aspekter af jobbehandling såsom start, stop og genstart
  • Dernæst har vi Job. Et job er en logisk samling trin; det indkapsler en hel batchproces
  • Et job vil indeholde mellem 1 og n Trins. Hvert trin er en uafhængig, sekventiel arbejdsenhed. Et trin er sammensat af læsning input, forarbejdning dette input, og skrivning produktion
  • Og sidst, men ikke mindst, har vi Jobopbevaring der gemmer de løbende oplysninger om jobbet. Det hjælper med at holde styr på job, deres tilstand og deres færdiggørelsesresultater

Trin har lidt mere detaljer end dette, så lad os se på det næste. Først skal vi se på Luns trin og derefter kl Batchlets.

4. Oprettelse af et stykke

Som tidligere nævnt er et stykke et slags skridt. Vi bruger ofte en del til at udtrykke en operation, der udføres igen og igen, for eksempel over et sæt emner. Det er ligesom mellemliggende operationer fra Java Streams.

Når vi beskriver en del, er vi nødt til at udtrykke, hvor vi skal tage emner fra, hvordan vi behandler dem, og hvor vi skal sende dem bagefter.

4.1. Læseemner

For at læse emner skal vi implementere ItemReader.

I dette tilfælde opretter vi en læser, der simpelthen udsender tallene 1 til 10:

@Named offentlig klasse SimpleChunkItemReader udvider AbstractItemReader {private heltal [] tokens; privat heltalstælling; @Inject JobContext jobContext; @ Override public Integer readItem () kaster undtagelse {if (count> = tokens.length) {return null; } jobContext.setTransientUserData (count); returnere tokens [count ++]; } @ Overstyr offentlig tomrum åbent (Serialiserbart kontrolpunkt) kaster undtagelse {tokens = nyt heltal [] {1,2,3,4,5,6,7,8,9,10}; tælle = 0; }}

Nu læser vi bare af klassens interne tilstand her. Men selvfølgelig, readItem kunne trække fra en database, fra filsystemet eller en anden ekstern kilde.

Bemærk, at vi gemmer noget af denne interne tilstand ved hjælp af JobContext # setTransientUserData () hvilket vil være nyttigt senere.

Bemærk også kontrolpunkt parameter. Vi henter det også igen.

4.2. Behandler varer

Selvfølgelig er grunden til, at vi klumper, at vi vil udføre en slags operation på vores varer!

Hver gang vi vender tilbage nul fra en vareprocessor slipper vi den vare fra batchen.

Så lad os her sige, at vi kun vil beholde lige tal. Vi kan bruge en ItemProcessor der afviser de ulige ved at vende tilbage nul:

@Named offentlig klasse SimpleChunkItemProcessor implementerer ItemProcessor {@Override public Integer processItem (Object t) {Integer item = (Integer) t; returnere artikel% 2 == 0? vare: null; }}

procesItem vil blive kaldt en gang for hver vare, som vores ItemReader udsender.

4.3. Skrivning af emner

Endelig vil jobbet påberåbe sig ItemWriter så vi kan skrive vores transformerede genstande:

@Named offentlig klasse SimpleChunkWriter udvider AbstractItemWriter {Liste behandlet = ny ArrayList (); @Override public void writeItems (List items) kaster Undtagelse {items.stream (). Map (Integer.class :: cast) .forEach (processed :: add); }} 

Hvor lang er det genstande? I et øjeblik definerer vi en klods størrelse, som bestemmer størrelsen på den liste, der sendes til writeItems.

4.4. Definition af en klump i et job

Nu sætter vi alt dette sammen i en XML-fil ved hjælp af JSL eller Job Specification Language. Bemærk, at vi viser vores læser, processor, chunker og også en klumpstørrelse:

Klumpstørrelsen er, hvor ofte fremskridt i klumpen er forpligtet til jobopbevaringsområdet, som er vigtigt for at garantere færdiggørelse, hvis en del af systemet mislykkes.

Vi bliver nødt til at placere denne fil i META-INF / batch-job til .krukke filer og i WEB-INF / klasser / META-INF / batch-job til .krig filer.

Vi gav vores job id “SimpleChunk”, så lad os prøve det i en enhedstest.

Nu udføres job asynkront, hvilket gør dem vanskelige at teste. I prøven skal du sørge for at tjekke vores BatchTestHelper hvilke afstemninger og venter, indtil jobbet er afsluttet:

@Test offentlig ugyldighed givenChunk_thenBatch_completesWithSuccess () kaster undtagelse {JobOperator jobOperator = BatchRuntime.getJobOperator (); Lang udførelseId = jobOperator.start ("simpleChunk", nye egenskaber ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); } 

Så det er, hvad bidder er. Lad os nu se på batchlets.

5. Oprettelse af en Batchlet

Ikke alt passer pænt ind i en iterativ model. For eksempel kan vi have en opgave, som vi simpelthen har brug for påberåbe sig en gang, kør til afslutning, og returner en exitstatus.

Kontrakten for en batchlet er ret enkel:

@Named offentlig klasse SimpleBatchLet udvider AbstractBatchlet {@Override public String process () kaster undtagelse {returner BatchStatus.COMPLETED.toString (); }}

Som det er JSL:

Og vi kan teste det ved hjælp af den samme tilgang som før:

@Test offentlig ugyldighed givenBatchlet_thenBatch_completeWithSuccess () kaster undtagelse {JobOperator jobOperator = BatchRuntime.getJobOperator (); Lang udførelseId = jobOperator.start ("simpleBatchLet", nye egenskaber ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Så vi har set på et par forskellige måder at implementere trin på.

Lad os nu se på mekanismer til markering og garanti for fremskridt.

6. Brugerdefineret kontrolpunkt

Fejl vil sandsynligvis ske midt i et job. Skal vi bare starte forfra, eller kan vi på en eller anden måde starte der, hvor vi slap?

Som navnet antyder, kontrolpunkter hjælp os med jævne mellemrum at oprette et bogmærke i tilfælde af fejl.

Som standard er afslutningen på klumpbehandling et naturligt kontrolpunkt.

Vi kan dog tilpasse det med vores egne Kontrolpunkt algoritme:

@Named offentlig klasse CustomCheckPoint udvider AbstractCheckpointAlgorithm {@Inject JobContext jobContext; @Override offentlig boolsk isReadyToCheckpoint () kaster undtagelse {int counterRead = (heltal) jobContext.getTransientUserData (); return counterLæs% 5 == 0; }}

Husker du antallet, vi tidligere har placeret i forbigående data? Her, vi kan trække det ud med JobContext # getTransientUserDatafor at sige, at vi ønsker at forpligte os til hvert 5. nummer, der behandles.

Uden dette ville der ske en forpligtelse i slutningen af ​​hvert stykke eller i vores tilfælde hvert 3. tal.

Og så matcher vi det med checkout-algoritme direktiv i vores XML under vores del:

Lad os teste koden igen og bemærke, at nogle af kedelpladetrinnene er skjult væk i BatchTestHelper:

@Test offentlig ugyldighed givenChunk_whenCustomCheckPoint_thenCommitCountIsThree () kaster undtagelse {// ... start job og vent på færdiggørelse jobOperator.getStepExecutions (executionId) .stream () .map (BatchTestHelper :: getCommitCount) .forEach (count -> 3 assertEquals) .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Så vi forventer muligvis en forpligtelsestælling på 2, da vi har ti varer og konfigureret forpligtelserne til at være hvert 5. element. Men, rammen forpligter sig endnu en sidste læsning til sidst for at sikre, at alt er behandlet, hvilket bringer os op til 3.

Lad os derefter se på, hvordan man håndterer fejl.

7. Undtagelse Håndtering

Som standard, joboperatøren markerer vores job som MISLYKKEDES i tilfælde af en undtagelse.

Lad os ændre vores artikellæser for at sikre, at den fejler:

@ Override public Integer readItem () kaster undtagelse {if (tokens.hasMoreTokens ()) {String tempTokenize = tokens.nextToken (); smid ny RuntimeException (); } returnere null; }

Og test derefter:

@Test offentlig ugyldig nårChunkError_thenBatch_CompletesWithFailed () kaster Undtagelse {// ... start job og vent på afslutning assertEquals (jobExecution.getBatchStatus (), BatchStatus.FAILED); }

Men vi kan tilsidesætte denne standardadfærd på en række måder:

  • spring-grænse angiver antallet af undtagelser, som dette trin vil ignorere, før det mislykkes
  • prøv igen-grænse angiver antallet af gange, som joboperatøren skal prøve igen, før det mislykkes
  • kan springes over-undtagelse-klasse angiver et sæt undtagelser, som behandling af klumper vil ignorere

Så vi kan redigere vores job, så det ignorerer RuntimeExceptionsåvel som et par andre kun for at illustrere:

Og nu vil vores kode passere:

@Test offentlig ugyldighed givenChunkError_thenErrorSkipped_CompletesWithSuccess () kaster undtagelse {// ... start job og vent på færdiggørelse jobOperator.getStepExecutions (executionId) .stream () .map (BatchTestHelper :: getProcessSkipCount) .forEachert (springCount - skipCount) .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8. Udførelse af flere trin

Vi nævnte tidligere, at et job kan have et hvilket som helst antal trin, så lad os se det nu.

8.1. Fyring af det næste trin

Som standard, hvert trin er det sidste trin i jobbet.

For at udføre det næste trin inden for et batchjob skal vi udtrykkeligt specificere ved hjælp af Næste attribut inden for trindefinitionen:

Hvis vi glemmer denne attribut, bliver det næste trin i rækkefølge ikke udført.

Og vi kan se, hvordan dette ser ud i API:

@Test offentlig ugyldighed givenTwoSteps_thenBatch_CompleteWithSuccess () kaster Undtagelse {// ... start job og vent på færdiggørelse assertEquals (2, jobOperator.getStepExecutions (executId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.2. Strømme

En sekvens af trin kan også indkapsles i en flyde. Når strømmen er færdig, er det hele strømmen, der overgår til udførelseselementet. Også elementer inde i strømmen kan ikke overgå til elementer uden for strømmen.

Vi kan sige, udføre to trin inde i en strøm og derefter få denne overgang til et isoleret trin:

Og vi kan stadig se udførelsen af ​​hvert trin uafhængigt:

@Test offentlig ugyldighed givenFlow_thenBatch_CompleteWithSuccess () kaster undtagelse {// ... start job og vent på færdiggørelse assertEquals (3, jobOperator.getStepExecutions (executId) .størrelse ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.3. Beslutninger

Vi har også if / else support i form af beslutninger. Beslutninger giver en tilpasset måde at bestemme en sekvens på mellem trin, flow og split.

Ligesom trin fungerer det på overgangselementer som f.eks Næste som kan lede eller afslutte jobudførelsen.

Lad os se, hvordan jobbet kan konfigureres:

Nogen afgørelse element skal konfigureres med en klasse, der implementeres Afgør. Dets opgave er at returnere en beslutning som en Snor.

Hver Næste inde afgørelse er som en sag i en kontakt udmelding.

8.4. Opdeler

Opdeler er nyttige, da de giver os mulighed for at udføre strømme samtidigt:

Selvfølgelig, det betyder, at ordren ikke garanteres.

Lad os bekræfte, at de stadig alle kører. Strømningstrinnene udføres i en vilkårlig rækkefølge, men det isolerede trin vil altid være det sidste:

@Test offentlig ugyldighed givenSplit_thenBatch_CompletesWithSuccess () kaster Undtagelse {// ... start job og vent på færdiggørelse List stepExecutions = jobOperator.getStepExecutions (executionId); assertEquals (3, stepExecutions.size ()); assertEquals ("splitJobSequenceStep3", stepExecutions.get (2) .getStepName ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

9. Opdeling af et job

Vi kan også forbruge de batchegenskaber inden for vores Java-kode, som er defineret i vores job.

De kan afgrænses på tre niveauer - jobbet, trin og batch-artefakt.

Lad os se nogle eksempler på, hvordan de forbrugte.

Når vi vil forbruge ejendommene på jobniveau:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties (); ...

Dette kan også indtages på et trinniveau:

@Injicer StepContext stepContext; ... stepProperties = stepContext.getProperties (); ...

Når vi vil forbruge egenskaberne på batch-artefaktniveau:

@Inject @BatchProperty (name = "name") privat String nameString;

Dette kommer godt med skillevægge.

Se med opdelinger kan vi køre strømme samtidigt. Men det kan vi også skillevæg et skridt ind n sæt af emner eller indstille separate indgange, hvilket giver os en anden måde at opdele arbejdet på tværs af flere tråde.

For at forstå det arbejdssegment, som hver partition skal udføre, kan vi kombinere egenskaber med partitioner:

10. Stop og genstart

Nu er det det til at definere job. Lad os nu tale et øjeblik om at styre dem.

Vi har allerede set i vores enhedstest, at vi kan få en forekomst af JobOperator fra BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator ();

Og så kan vi starte jobbet:

Lang udførelseId = jobOperator.start ("simpleBatchlet", nye egenskaber ());

Vi kan dog også stoppe jobbet:

jobOperator.stop (executionId);

Og endelig kan vi genstarte jobbet:

executionId = jobOperator.restart (executionId, nye egenskaber ());

Lad os se, hvordan vi kan stoppe et kørende job:

@Test offentlig ugyldighed givenBatchLetStarted_whenStopped_thenBatchStopped () kaster undtagelse {JobOperator jobOperator = BatchRuntime.getJobOperator (); Lang udførelseId = jobOperator.start ("simpleBatchLet", nye egenskaber ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobOperator.stop (executionId); jobExecution = BatchTestHelper.keepTestStopped (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); }

Og hvis en batch er HOLDT OP, så kan vi genstarte det:

@Test offentlig ugyldighed givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess () {// ... start og stop job assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); executionId = jobOperator.restart (jobExecution.getExecutionId (), nye egenskaber ()); jobExecution = BatchTestHelper.keepTestAlive (jobOperator.getJobExecution (executionId)); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

11. Henter job

Når et batchjob sendes derefter batch runtime opretter en forekomst af Jobudførelse at spore det.

For at få Jobudførelse til et eksekverings-id kan vi bruge JobOperator # getJobExecution (executionId) metode.

Og, StepExecution giver nyttige oplysninger til sporing af et trins udførelse.

For at få StepExecution til et eksekverings-id kan vi bruge JobOperator # getStepExecutions (executionId) metode.

Og ud fra det kan vi få flere målinger om trinnet via StepExecution # getMetrics:

@Test offentlig ugyldighed givenChunk_whenJobStarts_thenStepsHaveMetrics () kaster undtagelse {// ... start job og vent på færdiggørelse assertTrue (jobOperator.getJobNames (). Indeholder ("simpleChunk")); assertTrue (jobOperator.getParameters (executionId) .isEmpty ()); StepExecution stepExecution = jobOperator.getStepExecutions (executionId) .get (0); Kort metricTest = BatchTestHelper.getMetricsMap (stepExecution.getMetrics ()); assertEquals (10L, metricTest.get (Metric.MetricType.READ_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.FILTER_COUNT) .longValue ()); assertEquals (4L, metricTest.get (Metric.MetricType.COMMIT_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.WRITE_COUNT) .longValue ()); // ... og mange flere! }

12. Ulemper

JSR 352 er kraftfuld, selvom den mangler inden for en række områder:

  • Der synes at være mangel på læsere og forfattere, som kan behandle andre formater som JSON
  • Der er ingen understøttelse af generiske lægemidler
  • Partitionering understøtter kun et enkelt trin
  • API'en tilbyder ikke noget til understøttelse af planlægning (selvom J2EE har et separat planlægningsmodul)
  • På grund af sin asynkrone natur kan test være en udfordring
  • API'en er ret detaljeret

13. Konklusion

I denne artikel kiggede vi på JSR 352 og lærte om bidder, batchlets, split, flow og meget mere. Alligevel har vi næppe ridset overfladen.

Som altid kan demo-koden findes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found