V minulém díle jsme si vytvořili job, který obsahoval několik kroků (Step). Každému kroku byl přiřazen jeden tasklet, který implementoval celou funkcionalitu. Dost často ale krok v jobu bude načítat data, zpracovávat je a nakonec zapisovat. Pro tento případ je tady chunk. V rámci chunku je možné definovat reader, processor a writer.
<chunk reader="myReader" writer="myWriter" processor="myProcessor">
V následujícím příkladu si ukážeme job, který se bude skládat z jediného kroku, během kterého zavoláme náš reader, poté data vrácená z readeru zpracujeme ve vlastním processoru a nakonec data zapíšem writerem.
context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd"> <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <beans:property name="transactionManager" ref="transactionManager" /> </beans:bean> <beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <beans:property name="jobRepository" ref="jobRepository" /> </beans:bean> <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans:beans>
job.xml
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd"> <beans:import resource="context.xml" /> <beans:bean id="myReader" class="cz.vitfo.MyReader"/> <beans:bean id="myProcessor" class="cz.vitfo.MyProcessor"/> <beans:bean id="myWriter" class="cz.vitfo.MyWriter"/> <job id="myJob"> <step id="myStep"> <tasklet> <chunk reader="myReader" writer="myWriter" processor="myProcessor" commit-interval="4"/> </tasklet> </step> </job> </beans:beans>
Třída MyInput představuje objekty, které budou načítány (vraceny) readerem.
public class MyInput {
private String identificator;
public MyInput(String identificator) {
this.identificator = identificator;
}
public String getIdentificator() {
return identificator;
}
public void setIdentificator(String identificator) {
this.identificator = identificator;
}
}
Třída MyOutput představuje objekty, které bude vracet processor (výsledek zpracování MyInput)
public class MyOutput {
private String id;
private int hash;
public MyOutput() {}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getHash() {
return hash;
}
public void setHash(int hash) {
this.hash = hash;
}
}
MyReader implementuje rozhraní ItemReader a jeho funkcí je získat vstup. V tomto případě vrátí (načte) deset objektů typu MyInput.
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import cz.vitfo.inputoutput.MyInput;
public class MyReader implements ItemReader<MyInput> {
private static int counter = 1;
public MyInput read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
while (counter <= 10) {
counter++;
MyInput myInput = new MyInput(UUID.randomUUID().toString());
System.out.println("Reading input: \t" + myInput.getIdentificator());
return myInput;
}
return null;
}
}
MyProcessor implementuje rozhraní ItemProcessor a dostane na vstupu MyInput a přetvoří jej v MyOutput. V tomto případě použijeme identifikátor z MyInput jako id a jako hash dáme hash identifikátoru.
import cz.vitfo.inputoutput.MyInput;
import cz.vitfo.inputoutput.MyOutput;
public class MyProcessor implements ItemProcessor<MyInput, MyOutput>{
public MyOutput process(MyInput myInput) throws Exception {
System.out.println("Processing.");
MyOutput myOutput = new MyOutput();
myOutput.setId(myInput.getIdentificator());
myOutput.setHash(myInput.getIdentificator().hashCode());
return myOutput;
}
}
MyWriter implementuje rozhraní ItemWriter a vypíše výsledky do System.out.
import java.util.List;
import org.springframework.batch.item.ItemWriter;
import cz.vitfo.inputoutput.MyOutput;
public class MyWriter implements ItemWriter<MyOutput> {
public void write(List<? extends MyOutput> myOutputs) throws Exception {
for (MyOutput myOutput : myOutputs) {
System.out.println("Writing output: \t" + myOutput.getId() + "\t" + myOutput.getHash());
}
}
}
ExecuteJob
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ExecuteJob {
public static void main(String[] args) {
String[] springConfig = {"job.xml"};
AbstractApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("myJob");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Exit Status: " + execution.getStatus());
System.out.println("Start: " + execution.getStartTime() + "\tEnd: " + execution.getEndTime());
} catch (Exception e) {
e.printStackTrace();
} finally {
context.close();
}
System.out.println("Done");
}
}
Výsledek
Reading input: 6ba2f1a2-04e2-434d-b590-b2391de48bd8
Reading input: 19825a0d-443b-43bb-8060-c7f940f66196
Reading input: 2f240416-bab3-4b54-afc6-68de1e3af3fa
Reading input: 31f2c4b5-0e28-4e2b-9f0b-b4416330fc5c
Processing.
Processing.
Processing.
Processing.
Writing output: 6ba2f1a2-04e2-434d-b590-b2391de48bd8 -1446935741
Writing output: 19825a0d-443b-43bb-8060-c7f940f66196 -1463897045
Writing output: 2f240416-bab3-4b54-afc6-68de1e3af3fa 216531505
Writing output: 31f2c4b5-0e28-4e2b-9f0b-b4416330fc5c 777390857
Reading input: 4d9fb312-52ee-4788-a1df-e3b19e9fbeae
Reading input: d629b074-e8e7-4fff-92d4-024734cbd9fe
Reading input: fe49d7dc-6271-4b24-af68-409c740f4dc9
Reading input: 44fbc3bd-f508-4f25-881e-8d8db973f2a9
Processing.
Processing.
Processing.
Processing.
Writing output: 4d9fb312-52ee-4788-a1df-e3b19e9fbeae 906437152
Writing output: d629b074-e8e7-4fff-92d4-024734cbd9fe -2089939263
Writing output: fe49d7dc-6271-4b24-af68-409c740f4dc9 -947607384
Writing output: 44fbc3bd-f508-4f25-881e-8d8db973f2a9 -735875801
Reading input: 8f8f7d74-50ce-4d37-8919-6b97b7704771
Reading input: b1eb4508-e118-40a3-b61f-e46182f4ecf6
Processing.
Processing.
Writing output: 8f8f7d74-50ce-4d37-8919-6b97b7704771 -842433527
Writing output: b1eb4508-e118-40a3-b61f-e46182f4ecf6 -458259897
Pro 13, 2016 3:31:02 ODP. org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=myJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
Exit Status: COMPLETED
Start: Tue Dec 13 15:31:02 CET 2016 End: Tue Dec 13 15:31:02 CET 2016
Done
Chunk má nastavem commit-interval na 4. Jak je vidět z výpisu, Data se zpracovávají po čtyřech.