Spring Batch异常处理
This commit is contained in:
parent
643b7d8fd6
commit
b7aa99b906
|
|
@ -0,0 +1,46 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.2.5.RELEASE</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
<groupId>cc.mrbird</groupId>
|
||||
<artifactId>spring-batch-exception</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>spring-batch-exception</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-batch</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package cc.mrbird.batch;
|
||||
|
||||
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBatchProcessing
|
||||
public class SpringBatchExceptionApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SpringBatchExceptionApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
package cc.mrbird.batch.exception;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
public class MyJobExecutionException extends Exception{
|
||||
|
||||
private static final long serialVersionUID = 7168487913507656106L;
|
||||
|
||||
public MyJobExecutionException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
package cc.mrbird.batch.job;
|
||||
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.batch.item.ExecutionContext;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class DefaultExceptionJobDemo {
|
||||
|
||||
@Autowired
|
||||
private JobBuilderFactory jobBuilderFactory;
|
||||
@Autowired
|
||||
private StepBuilderFactory stepBuilderFactory;
|
||||
|
||||
@Bean
|
||||
public Job defaultExceptionJob() {
|
||||
return jobBuilderFactory.get("defaultExceptionJob")
|
||||
.start(
|
||||
stepBuilderFactory.get("step")
|
||||
.tasklet((stepContribution, chunkContext) -> {
|
||||
// 获取执行上下文
|
||||
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
|
||||
if (executionContext.containsKey("success")) {
|
||||
System.out.println("任务执行成功");
|
||||
return RepeatStatus.FINISHED;
|
||||
} else {
|
||||
String errorMessage = "处理任务过程发生异常";
|
||||
System.out.println(errorMessage);
|
||||
executionContext.put("success", true);
|
||||
throw new RuntimeException(errorMessage);
|
||||
}
|
||||
|
||||
}).build()
|
||||
).build();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
package cc.mrbird.batch.job;
|
||||
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.batch.item.support.ListItemReader;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Isolation;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class RestartJobDemo {
|
||||
|
||||
@Autowired
|
||||
private JobBuilderFactory jobBuilderFactory;
|
||||
@Autowired
|
||||
private StepBuilderFactory stepBuilderFactory;
|
||||
|
||||
@Bean
|
||||
public Job restartJob() {
|
||||
return jobBuilderFactory.get("restartJob")
|
||||
.start(step())
|
||||
.build();
|
||||
}
|
||||
|
||||
private Step step() {
|
||||
return stepBuilderFactory.get("step")
|
||||
.<String, String>chunk(2)
|
||||
.reader(listItemReader())
|
||||
.writer(list -> list.forEach(System.out::println))
|
||||
// .allowStartIfComplete(true)
|
||||
.startLimit(1)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListItemReader<String> listItemReader() {
|
||||
ArrayList<String> datas = new ArrayList<>();
|
||||
IntStream.range(0, 5).forEach(i -> datas.add(String.valueOf(i)));
|
||||
return new ListItemReader<>(datas);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
package cc.mrbird.batch.job;
|
||||
|
||||
import cc.mrbird.batch.exception.MyJobExecutionException;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.support.ListItemReader;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class RetryExceptionJobDemo {
|
||||
|
||||
@Autowired
|
||||
private JobBuilderFactory jobBuilderFactory;
|
||||
@Autowired
|
||||
private StepBuilderFactory stepBuilderFactory;
|
||||
|
||||
@Bean
|
||||
public Job retryExceptionJob() {
|
||||
return jobBuilderFactory.get("retryExceptionJob")
|
||||
.start(step())
|
||||
.build();
|
||||
}
|
||||
|
||||
private Step step() {
|
||||
return stepBuilderFactory.get("step")
|
||||
.<String, String>chunk(2)
|
||||
.reader(listItemReader())
|
||||
.processor(myProcessor())
|
||||
.writer(list -> list.forEach(System.out::println))
|
||||
.faultTolerant() // 配置错误容忍
|
||||
.retry(MyJobExecutionException.class) // 配置重试的异常类型
|
||||
.retryLimit(3) // 重试3次,三次过后还是异常的话,则任务会结束,
|
||||
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常重试
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListItemReader<String> listItemReader() {
|
||||
ArrayList<String> datas = new ArrayList<>();
|
||||
IntStream.range(0, 5).forEach(i -> datas.add(String.valueOf(i)));
|
||||
return new ListItemReader<>(datas);
|
||||
}
|
||||
|
||||
private ItemProcessor<String, String> myProcessor() {
|
||||
return new ItemProcessor<String, String>() {
|
||||
private int count;
|
||||
@Override
|
||||
public String process(String item) throws Exception {
|
||||
System.out.println("当前处理的数据:" + item);
|
||||
if (count >= 2) {
|
||||
return item;
|
||||
} else {
|
||||
count++;
|
||||
throw new MyJobExecutionException("任务处理出错");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package cc.mrbird.batch.job;
|
||||
|
||||
import cc.mrbird.batch.exception.MyJobExecutionException;
|
||||
import cc.mrbird.batch.listener.MySkipListener;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.support.ListItemReader;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class SkipExceptionJobDemo {
|
||||
|
||||
@Autowired
|
||||
private JobBuilderFactory jobBuilderFactory;
|
||||
@Autowired
|
||||
private StepBuilderFactory stepBuilderFactory;
|
||||
@Autowired
|
||||
private MySkipListener mySkipListener;
|
||||
|
||||
@Bean
|
||||
public Job skipExceptionJob() {
|
||||
return jobBuilderFactory.get("skipExceptionJob")
|
||||
.start(step())
|
||||
.build();
|
||||
}
|
||||
|
||||
private Step step() {
|
||||
return stepBuilderFactory.get("step")
|
||||
.<String, String>chunk(2)
|
||||
.reader(listItemReader())
|
||||
.processor(myProcessor())
|
||||
.writer(list -> list.forEach(System.out::println))
|
||||
.faultTolerant() // 配置错误容忍
|
||||
.skip(MyJobExecutionException.class) // 配置跳过的异常类型
|
||||
.skipLimit(1) // 最多跳过1次,1次过后还是异常的话,则任务会结束,
|
||||
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
|
||||
.listener(mySkipListener)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListItemReader<String> listItemReader() {
|
||||
ArrayList<String> datas = new ArrayList<>();
|
||||
IntStream.range(0, 5).forEach(i -> datas.add(String.valueOf(i)));
|
||||
return new ListItemReader<>(datas);
|
||||
}
|
||||
|
||||
private ItemProcessor<String, String> myProcessor() {
|
||||
return item -> {
|
||||
System.out.println("当前处理的数据:" + item);
|
||||
if ("2".equals(item)) {
|
||||
throw new MyJobExecutionException("任务处理出错");
|
||||
} else {
|
||||
return item;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
package cc.mrbird.batch.job;
|
||||
|
||||
import cc.mrbird.batch.exception.MyJobExecutionException;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.support.ListItemReader;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Isolation;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class TransactionJobDemo {
|
||||
|
||||
@Autowired
|
||||
private JobBuilderFactory jobBuilderFactory;
|
||||
@Autowired
|
||||
private StepBuilderFactory stepBuilderFactory;
|
||||
|
||||
@Bean
|
||||
public Job transactionJob() {
|
||||
return jobBuilderFactory.get("transactionJob")
|
||||
.start(step())
|
||||
.build();
|
||||
}
|
||||
|
||||
private Step step() {
|
||||
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
|
||||
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
|
||||
attribute.setIsolationLevel(Isolation.DEFAULT.value());
|
||||
attribute.setTimeout(30);
|
||||
|
||||
return stepBuilderFactory.get("step")
|
||||
.<String, String>chunk(2)
|
||||
.reader(listItemReader())
|
||||
.writer(list -> list.forEach(System.out::println))
|
||||
.readerIsTransactionalQueue()
|
||||
.transactionAttribute(attribute)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListItemReader<String> listItemReader() {
|
||||
ArrayList<String> datas = new ArrayList<>();
|
||||
IntStream.range(0, 5).forEach(i -> datas.add(String.valueOf(i)));
|
||||
return new ListItemReader<>(datas);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
package cc.mrbird.batch.listener;
|
||||
|
||||
import org.springframework.batch.core.SkipListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author MrBird
|
||||
*/
|
||||
@Component
|
||||
public class MySkipListener implements SkipListener<String, String> {
|
||||
@Override
|
||||
public void onSkipInRead(Throwable t) {
|
||||
System.out.println("在读取数据的时候遇到异常并跳过,异常:" + t.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSkipInWrite(String item, Throwable t) {
|
||||
System.out.println("在输出数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSkipInProcess(String item, Throwable t) {
|
||||
System.out.println("在处理数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
spring:
|
||||
datasource:
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
url: jdbc:mysql://127.0.0.1:3306/springbatch
|
||||
username: root
|
||||
password: 123456
|
||||
Loading…
Reference in New Issue