Spring Batch ved hjælp af Partitioner

1. Oversigt

I vores tidligere introduktion til Spring Batch introducerede vi rammen som et batchbehandlingsværktøj. Vi undersøgte også konfigurationsoplysningerne og implementeringen af ​​en enkelt-trådet, enkelt procesjobudførelse.

For at implementere et job med en vis parallel behandling gives der en række muligheder. På et højere niveau er der to tilstande til parallel behandling:

  1. Single-Process, multi-threaded
  2. Multi-proces

I denne hurtige artikel vil vi diskutere partitionering af Trin, som kan implementeres til både enkeltproces- og flerprocesjob.

2. Opdeling af et trin

Spring Batch med partitionering giver os mulighed for at opdele udførelsen af ​​en Trin:

Partitionering Oversigt

Ovenstående billede viser en implementering af en Job med en opdelt Trin.

Der er en Trin kaldet “Master”, hvis udførelse er opdelt i nogle “Slave” trin. Disse slaver kan tage plads til en mester, og resultatet vil stadig være uændret. Både mester og slave er forekomster af Trin. Slaver kan være fjerntjenester eller bare lokalt udføre tråde.

Hvis det er nødvendigt, kan vi videregive data fra master til slave. Metadataene (dvs. Jobopbevaring), sørger for at hver slave kun udføres en gang i en enkelt udførelse af Job.

Her er sekvensdiagrammet, der viser, hvordan det hele fungerer:

Partitioneringstrin

Som vist er PartitionStep kører henrettelsen. Det PartitionHandler er ansvarlig for at opdele "Mesterens" arbejde i "Slaverne". Den højre Trin er slave.

3. Maven POM

Maven-afhængighederne er de samme som nævnt i vores tidligere artikel. Det vil sige Spring Core, Spring Batch og afhængigheden af ​​databasen (i vores tilfælde SQLite).

4. Konfiguration

I vores indledende artikel så vi et eksempel på konvertering af nogle økonomiske data fra CSV til XML-fil. Lad os udvide det samme eksempel.

Her konverterer vi de økonomiske oplysninger fra 5 CSV-filer til tilsvarende XML-filer ved hjælp af en implementering med flere tråde.

Vi kan opnå dette ved hjælp af en enkelt Job og Trin partitionering. Vi har fem tråde, en til hver af CSV-filerne.

Lad os først og fremmest oprette et job:

@Bean (name = "partitionerJob") offentlig jobpartitionerJob () kaster UnexpectedInputException, MalformedURLException, ParseException {return jobs.get ("partitioningJob") .start (partitionStep ()) .build (); }

Som vi kan se, dette Job starter med Partitionering Trin. Dette er vores mestertrin, som vil blive opdelt i forskellige slave-trin:

@Bean public Step partitionStep () kaster UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("partitionStep") .partitioner ("slaveStep", partitioner ()) .step (slaveStep ()) .taskExecutor (taskExecutor (). (); }

Her opretter vi Partitionering Trin ved hjælp af StepBuilderFactory. Til det er vi nødt til at give oplysningerne om SlaveTrin og Partitioner.

Det Partitioner er en grænseflade, der giver mulighed for at definere et sæt inputværdier for hver af slaverne. Med andre ord går logik til at opdele opgaver i respektive tråde her.

Lad os oprette en implementering af den, kaldet CustomMultiResourcePartitioner, hvor vi lægger input- og outputfilnavne i ExecutionContext at videregive til hvert slave trin:

offentlig klasse CustomMultiResourcePartitioner implementerer Partitioner {@Override public Map partition (int gridSize) {Map map = new HashMap (gridSize); int i = 0, k = 1; for (Ressource ressource: ressourcer) {ExecutionContext context = new ExecutionContext (); Assert.state (resource.exists (), "Ressource findes ikke:" + ressource); context.putString (keyname, resource.getFilename ()); context.putString ("opFileName", "output" + k +++ ". xml"); map.put (PARTITION_KEY + i, kontekst); i ++; } returner kort; }}

Vi opretter også bønnen til denne klasse, hvor vi giver kildekataloget til inputfiler:

@Bean offentlig CustomMultiResourcePartitioner partitioner () {CustomMultiResourcePartitioner partitioner = ny CustomMultiResourcePartitioner (); Ressource [] ressourcer; prøv {resources = resoursePatternResolver .getResources ("fil: src / main / resources / input / *. csv"); } fange (IOException e) {smid nyt RuntimeException ("I / O-problemer ved løsning" + "af inputfilmønsteret.", e); } partitioner.setResources (ressourcer); returnere partitioner; }

Vi definerer slavetrinet, ligesom ethvert andet trin med læseren og forfatteren. Læseren og forfatteren vil være den samme som vi så i vores indledende eksempel, bortset fra at de modtager filnavnparameter fra StepExecutionContext.

Bemærk, at disse bønner skal trinvis scopes, så de er i stand til at modtage stepExecutionContext params, ved hvert trin. Hvis de ikke ville være trinvis, oprettes deres bønner oprindeligt og accepterer ikke filnavnene på trinniveau:

@StepScope @Bean offentlig FlatFileItemReader itemReader (@Value ("# {stepExecutionContext [fileName]}") Strengfilnavn) kaster UnexpectedInputException, ParseException {FlatFileItemReader reader = ny FlatFileItemReader (); DelimitedLineTokenizer tokenizer = ny DelimitedLineTokenizer (); String [] tokens = {"brugernavn", "bruger-id", "transaktionsdato", "beløb"}; tokenizer.setNames (tokens); reader.setResource (nyt ClassPathResource ("input /" + filnavn)); DefaultLineMapper lineMapper = ny DefaultLineMapper (); lineMapper.setLineTokenizer (tokenizer); lineMapper.setFieldSetMapper (ny RecordFieldSetMapper ()); reader.setLinesToSkip (1); reader.setLineMapper (lineMapper); returlæser; } 
@Bean @StepScope public ItemWriter itemWriter (Marshaller marshaller, @Value ("# {stepExecutionContext [opFileName]}") Strengfilnavn) kaster MalformedURLException {StaxEventItemWriter itemWriter = ny StaxEventItemWriter (); itemWriter.setMarshaller (marshaller); itemWriter.setRootTagName ("transactionRecord"); itemWriter.setResource (nyt ClassPathResource ("xml /" + filnavn)); returnereWriter; }

Mens vi nævner læseren og forfatteren i slave-trinnet, kan vi videregive argumenterne som nul, fordi disse filnavne ikke vil blive brugt, da de modtager filnavnene fra stepExecutionContext:

@Bean public Step slaveStep () kaster UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("slaveStep"). Chunk (1) .reader (itemReader (null)) .writer (itemWriter (marshaller (), null)) .build (); }

5. Konklusion

I denne vejledning diskuterede vi, hvordan man implementerer et job med parallel behandling ved hjælp af Spring Batch.

Som altid er den komplette implementering af dette eksempel tilgængelig på GitHub.