Spring Batch 3. díl

V minulém díle jsme si vytvořili job, který obsahoval několik kroků (Step). Každému kroku byl přiřazen jeden tasklet, který implementoval celou funkcionalitu. Dost často ale krok v jobu bude načítat data, zpracovávat je a nakonec zapisovat. Pro tento případ je tady chunk. V rámci chunku je možné definovat reader, processor a writer.

<chunk reader="myReader" writer="myWriter" processor="myProcessor">

V následujícím příkladu si ukážeme job, který se bude skládat z jediného kroku, během kterého zavoláme náš reader, poté data vrácená z readeru zpracujeme ve vlastním processoru a nakonec data zapíšem writerem.

context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
	xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">
	
	<beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<beans:property name="transactionManager" ref="transactionManager" />
	</beans:bean>
	
	<beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<beans:property name="jobRepository" ref="jobRepository" />
	</beans:bean>
	
	<beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
</beans:beans>

job.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
	xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
						http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
						http://www.springframework.org/schema/batch
						http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">
	<beans:import resource="context.xml" />
	
	<beans:bean id="myReader" class="cz.vitfo.MyReader"/>
	<beans:bean id="myProcessor" class="cz.vitfo.MyProcessor"/>
	<beans:bean id="myWriter" class="cz.vitfo.MyWriter"/>
	
	<job id="myJob">
		<step id="myStep">
			<tasklet>
				<chunk reader="myReader" writer="myWriter" processor="myProcessor" commit-interval="4"/>
			</tasklet>
		</step>
	</job>
</beans:beans>

Třída MyInput představuje objekty, které budou načítány (vraceny) readerem.

public class MyInput {
	
	private String identificator;
	
	public MyInput(String identificator) {
		this.identificator = identificator;
	}

	public String getIdentificator() {
		return identificator;
	}

	public void setIdentificator(String identificator) {
		this.identificator = identificator;
	}
}

Třída MyOutput představuje objekty, které bude vracet processor (výsledek zpracování MyInput)

public class MyOutput {

	private String id;
	private int hash;

	public MyOutput() {}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public int getHash() {
		return hash;
	}

	public void setHash(int hash) {
		this.hash = hash;
	}
}

MyReader implementuje rozhraní ItemReader a jeho funkcí je získat vstup. V tomto případě vrátí (načte) deset objektů typu MyInput.

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import cz.vitfo.inputoutput.MyInput;

public class MyReader implements ItemReader<MyInput> {
	
	private static int counter = 1;

	public MyInput read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		while (counter <= 10) {
			counter++;
			
			MyInput myInput = new MyInput(UUID.randomUUID().toString());
			System.out.println("Reading input: \t" + myInput.getIdentificator());
			return myInput;
		}
		return null;
	}
}

MyProcessor implementuje rozhraní ItemProcessor a dostane na vstupu MyInput a přetvoří jej v MyOutput. V tomto případě použijeme identifikátor z MyInput jako id a jako hash dáme hash identifikátoru.

import cz.vitfo.inputoutput.MyInput;
import cz.vitfo.inputoutput.MyOutput;

public class MyProcessor implements ItemProcessor<MyInput, MyOutput>{

	public MyOutput process(MyInput myInput) throws Exception {
		System.out.println("Processing.");
		MyOutput myOutput = new MyOutput();
		myOutput.setId(myInput.getIdentificator());
		myOutput.setHash(myInput.getIdentificator().hashCode());
		return myOutput;
	}
}

MyWriter implementuje rozhraní ItemWriter a vypíše výsledky do System.out.

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import cz.vitfo.inputoutput.MyOutput;

public class MyWriter implements ItemWriter<MyOutput> {

	public void write(List<? extends MyOutput> myOutputs) throws Exception {
		for (MyOutput myOutput : myOutputs) {
			System.out.println("Writing output: \t" + myOutput.getId() + "\t" + myOutput.getHash());
		}
	}
}

ExecuteJob

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ExecuteJob {
	public static void main(String[] args) {
		String[] springConfig = {"job.xml"};
		AbstractApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
		
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("myJob");
		
		try {
			JobExecution execution = jobLauncher.run(job, new JobParameters());
			System.out.println("Exit Status: " + execution.getStatus());
			System.out.println("Start: " + execution.getStartTime() + "\tEnd: " + execution.getEndTime());
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			context.close();
		}
		System.out.println("Done");
	}
}

Výsledek

Reading input: 	6ba2f1a2-04e2-434d-b590-b2391de48bd8
Reading input: 	19825a0d-443b-43bb-8060-c7f940f66196
Reading input: 	2f240416-bab3-4b54-afc6-68de1e3af3fa
Reading input: 	31f2c4b5-0e28-4e2b-9f0b-b4416330fc5c
Processing.
Processing.
Processing.
Processing.
Writing output: 	6ba2f1a2-04e2-434d-b590-b2391de48bd8	-1446935741
Writing output: 	19825a0d-443b-43bb-8060-c7f940f66196	-1463897045
Writing output: 	2f240416-bab3-4b54-afc6-68de1e3af3fa	216531505
Writing output: 	31f2c4b5-0e28-4e2b-9f0b-b4416330fc5c	777390857
Reading input: 	4d9fb312-52ee-4788-a1df-e3b19e9fbeae
Reading input: 	d629b074-e8e7-4fff-92d4-024734cbd9fe
Reading input: 	fe49d7dc-6271-4b24-af68-409c740f4dc9
Reading input: 	44fbc3bd-f508-4f25-881e-8d8db973f2a9
Processing.
Processing.
Processing.
Processing.
Writing output: 	4d9fb312-52ee-4788-a1df-e3b19e9fbeae	906437152
Writing output: 	d629b074-e8e7-4fff-92d4-024734cbd9fe	-2089939263
Writing output: 	fe49d7dc-6271-4b24-af68-409c740f4dc9	-947607384
Writing output: 	44fbc3bd-f508-4f25-881e-8d8db973f2a9	-735875801
Reading input: 	8f8f7d74-50ce-4d37-8919-6b97b7704771
Reading input: 	b1eb4508-e118-40a3-b61f-e46182f4ecf6
Processing.
Processing.
Writing output: 	8f8f7d74-50ce-4d37-8919-6b97b7704771	-842433527
Writing output: 	b1eb4508-e118-40a3-b61f-e46182f4ecf6	-458259897
Pro 13, 2016 3:31:02 ODP. org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=myJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
Exit Status: COMPLETED
Start: Tue Dec 13 15:31:02 CET 2016	End: Tue Dec 13 15:31:02 CET 2016
Done

Chunk má nastavem commit-interval na 4. Jak je vidět z výpisu, Data se zpracovávají po čtyřech.

Napsat komentář