V minulém díle jsme si vytvořili job, který obsahoval několik kroků (Step). Každému kroku byl přiřazen jeden tasklet, který implementoval celou funkcionalitu. Dost často ale krok v jobu bude načítat data, zpracovávat je a nakonec zapisovat. Pro tento případ je tady chunk. V rámci chunku je možné definovat reader, processor a writer.
<chunk reader="myReader" writer="myWriter" processor="myProcessor">
V následujícím příkladu si ukážeme job, který se bude skládat z jediného kroku, během kterého zavoláme náš reader, poté data vrácená z readeru zpracujeme ve vlastním processoru a nakonec data zapíšem writerem.
context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd"> <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <beans:property name="transactionManager" ref="transactionManager" /> </beans:bean> <beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <beans:property name="jobRepository" ref="jobRepository" /> </beans:bean> <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans:beans>
job.xml
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd"> <beans:import resource="context.xml" /> <beans:bean id="myReader" class="cz.vitfo.MyReader"/> <beans:bean id="myProcessor" class="cz.vitfo.MyProcessor"/> <beans:bean id="myWriter" class="cz.vitfo.MyWriter"/> <job id="myJob"> <step id="myStep"> <tasklet> <chunk reader="myReader" writer="myWriter" processor="myProcessor" commit-interval="4"/> </tasklet> </step> </job> </beans:beans>
Třída MyInput představuje objekty, které budou načítány (vraceny) readerem.
public class MyInput { private String identificator; public MyInput(String identificator) { this.identificator = identificator; } public String getIdentificator() { return identificator; } public void setIdentificator(String identificator) { this.identificator = identificator; } }
Třída MyOutput představuje objekty, které bude vracet processor (výsledek zpracování MyInput)
public class MyOutput { private String id; private int hash; public MyOutput() {} public String getId() { return id; } public void setId(String id) { this.id = id; } public int getHash() { return hash; } public void setHash(int hash) { this.hash = hash; } }
MyReader implementuje rozhraní ItemReader a jeho funkcí je získat vstup. V tomto případě vrátí (načte) deset objektů typu MyInput.
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import cz.vitfo.inputoutput.MyInput; public class MyReader implements ItemReader<MyInput> { private static int counter = 1; public MyInput read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { while (counter <= 10) { counter++; MyInput myInput = new MyInput(UUID.randomUUID().toString()); System.out.println("Reading input: \t" + myInput.getIdentificator()); return myInput; } return null; } }
MyProcessor implementuje rozhraní ItemProcessor a dostane na vstupu MyInput a přetvoří jej v MyOutput. V tomto případě použijeme identifikátor z MyInput jako id a jako hash dáme hash identifikátoru.
import cz.vitfo.inputoutput.MyInput; import cz.vitfo.inputoutput.MyOutput; public class MyProcessor implements ItemProcessor<MyInput, MyOutput>{ public MyOutput process(MyInput myInput) throws Exception { System.out.println("Processing."); MyOutput myOutput = new MyOutput(); myOutput.setId(myInput.getIdentificator()); myOutput.setHash(myInput.getIdentificator().hashCode()); return myOutput; } }
MyWriter implementuje rozhraní ItemWriter a vypíše výsledky do System.out.
import java.util.List; import org.springframework.batch.item.ItemWriter; import cz.vitfo.inputoutput.MyOutput; public class MyWriter implements ItemWriter<MyOutput> { public void write(List<? extends MyOutput> myOutputs) throws Exception { for (MyOutput myOutput : myOutputs) { System.out.println("Writing output: \t" + myOutput.getId() + "\t" + myOutput.getHash()); } } }
ExecuteJob
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ExecuteJob { public static void main(String[] args) { String[] springConfig = {"job.xml"}; AbstractApplicationContext context = new ClassPathXmlApplicationContext(springConfig); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("myJob"); try { JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Exit Status: " + execution.getStatus()); System.out.println("Start: " + execution.getStartTime() + "\tEnd: " + execution.getEndTime()); } catch (Exception e) { e.printStackTrace(); } finally { context.close(); } System.out.println("Done"); } }
Výsledek
Reading input: 6ba2f1a2-04e2-434d-b590-b2391de48bd8 Reading input: 19825a0d-443b-43bb-8060-c7f940f66196 Reading input: 2f240416-bab3-4b54-afc6-68de1e3af3fa Reading input: 31f2c4b5-0e28-4e2b-9f0b-b4416330fc5c Processing. Processing. Processing. Processing. Writing output: 6ba2f1a2-04e2-434d-b590-b2391de48bd8 -1446935741 Writing output: 19825a0d-443b-43bb-8060-c7f940f66196 -1463897045 Writing output: 2f240416-bab3-4b54-afc6-68de1e3af3fa 216531505 Writing output: 31f2c4b5-0e28-4e2b-9f0b-b4416330fc5c 777390857 Reading input: 4d9fb312-52ee-4788-a1df-e3b19e9fbeae Reading input: d629b074-e8e7-4fff-92d4-024734cbd9fe Reading input: fe49d7dc-6271-4b24-af68-409c740f4dc9 Reading input: 44fbc3bd-f508-4f25-881e-8d8db973f2a9 Processing. Processing. Processing. Processing. Writing output: 4d9fb312-52ee-4788-a1df-e3b19e9fbeae 906437152 Writing output: d629b074-e8e7-4fff-92d4-024734cbd9fe -2089939263 Writing output: fe49d7dc-6271-4b24-af68-409c740f4dc9 -947607384 Writing output: 44fbc3bd-f508-4f25-881e-8d8db973f2a9 -735875801 Reading input: 8f8f7d74-50ce-4d37-8919-6b97b7704771 Reading input: b1eb4508-e118-40a3-b61f-e46182f4ecf6 Processing. Processing. Writing output: 8f8f7d74-50ce-4d37-8919-6b97b7704771 -842433527 Writing output: b1eb4508-e118-40a3-b61f-e46182f4ecf6 -458259897 Pro 13, 2016 3:31:02 ODP. org.springframework.batch.core.launch.support.SimpleJobLauncher run INFO: Job: [FlowJob: [name=myJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] Exit Status: COMPLETED Start: Tue Dec 13 15:31:02 CET 2016 End: Tue Dec 13 15:31:02 CET 2016 Done
Chunk má nastavem commit-interval na 4. Jak je vidět z výpisu, Data se zpracovávají po čtyřech.