SpringDataJPA多数据源及JPA+atomikos实现分布式事务
在之前的文章中我们已经介绍了SpringJDBC的多数据源实现(在一个项目中操作多个数据库),比较常见的多数据源的支持方式有两种:
把数据源作为参数传入到调用方法内部,需要我们手动传参。
不同的包下面的接口函数自动注入不同的数据源。
第一种方法比较麻烦,会增加额外的代码。所以我们使用第二种方式来实现JPA的多数据源支持。
首先来修改.yml文件,配置两个数据源:
server: port: 8888spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: Family: driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: mysql://localhost:3306/Family?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 Family2: driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: mysql://localhost:3306/Family2?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 jpa: # Hibernate 创建数据库表的时候,默认使用的数据库存储引擎是 MyISAM # database-platform在建表的时候将存储引擎切换为 InnoDB database-platform: org.hibernate.dialect.MySQL5InnoDBDialect hibernate: # 在Hibernate每次加载的时候,都会验证数据库中的表结构是否跟model类中字段的定义是一致的,如果不一致就抛出异常 ddl-auto: validate naming: physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl database: mysql # 在日志打印出执行的sql语句 show-sql: true1234567891011121314151617181920212223242526272829303132复制代码类型:[java]
在Dao文件下创建文件夹db和db2,将model中的Pets.java移动到db文件下并把Dao文件下原有的PetsRepository也移动到db文件夹下。在db2下新创建一个实体类Doctor和DoctorRepository接口:
package com.javafamily.familydemo.dao.db2;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import javax.persistence.*;@Data@Entity@Builder@Table(name = "doctor")public class Doctor { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private long id; @Column(nullable = false) private String name; @Column(nullable = false) private String title; }123456789101112131415161718192021222324复制代码类型:[java]
package com.javafamily.familydemo.dao.db2;import com.javafamily.familydemo.model.Pets;import org.springframework.data.jpa.repository.JpaRepository;public interface DoctorRepository extends JpaRepository<Doctor, Long> { }12345678复制代码类型:[java]
现在我们需要实现将Family数据源给PetsRepository使用,Family2数据源给db2使用。
接下来我们来实现配置:在config文件下创建JPAFamilyConfig.java和JPAFamily2Config.java
package com.javafamily.familydemo.config;import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jdbc.DataSourceBuilder;import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaTransactionManager;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import javax.annotation.Resource;import javax.persistence.EntityManager;import javax.sql.DataSource;import java.util.Map;@Configuration@EnableTransactionManagement@EnableJpaRepositories( entityManagerFactoryRef = "entityManagerFactoryFamily", transactionManagerRef = "transactionManagerFamily", // 换成你自己的Repository所在位置 basePackages = {"com.javafamily.familydemo.dao.db"})public class JPAFamilyConfig { @Resource private JpaProperties jpaProperties; @Primary @Bean(name = "familyDataSource") @ConfigurationProperties(prefix = "spring.datasource.family") public DataSource familyDataSource() { return DataSourceBuilder.create().build(); } @Primary // 实体管理器 @Bean(name = "entityManagerFamily") public EntityManager entityManager(EntityManagerFactoryBuilder builder) { return entityManagerFactoryFamily(builder).getObject().createEntityManager(); } @Primary // 实体工厂 @Bean(name = "entityManagerFactoryFamily") public LocalContainerEntityManagerFactoryBean entityManagerFactoryFamily(EntityManagerFactoryBuilder builder) { return builder.dataSource(familyDataSource()) .properties(jpaProperties.getProperties()) // 换成你自己的实体类所在位置 .packages("com.javafamily.familydemo.dao.db") .persistenceUnit("familyPersistenceUnit") .build(); } @Primary // 事务管理器 @Bean(name = "transactionManagerFamily") public PlatformTransactionManager transactionManagerFamily(EntityManagerFactoryBuilder builder) { return new JpaTransactionManager(entityManagerFactoryFamily(builder).getObject()); } }1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768复制代码类型:[java]
package com.javafamily.familydemo.config;import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jdbc.DataSourceBuilder;import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaTransactionManager;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import javax.annotation.Resource;import javax.persistence.EntityManager;import javax.sql.DataSource;@Configuration@EnableTransactionManagement@EnableJpaRepositories( entityManagerFactoryRef = "entityManagerFactoryFamily2", transactionManagerRef = "transactionManagerFamily2", // 换成你自己的Repository所在位置 basePackages = {"com.javafamily.familydemo.dao.db2"})public class JPAFamily2Config { @Resource private JpaProperties jpaProperties; @Bean(name = "family2DataSource") @ConfigurationProperties(prefix = "spring.datasource.family2") public DataSource family2DataSource() { return DataSourceBuilder.create().build(); } @Bean(name = "entityManagerFamily2") public EntityManager entityManager(EntityManagerFactoryBuilder builder) { return entityManagerFactoryFamily2(builder).getObject().createEntityManager(); } @Bean(name = "entityManagerFactoryFamily2") public LocalContainerEntityManagerFactoryBean entityManagerFactoryFamily2(EntityManagerFactoryBuilder builder) { return builder.dataSource(family2DataSource()) .properties(jpaProperties.getProperties()) // 换成你自己的实体类所在位置 .packages("com.javafamily.familydemo.dao.db2") .persistenceUnit("family2PersistenceUnit") .build(); } @Bean(name = "transactionManagerFamily2") public PlatformTransactionManager transactionManagerFamily2(EntityManagerFactoryBuilder builder) { return new JpaTransactionManager(entityManagerFactoryFamily2(builder).getObject()); } }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758复制代码类型:[java]
完成上述配置后,再编写测试类对多数据源进行测试。
package com.javafamily.familydemo;import com.javafamily.familydemo.dao.db.Pets;import com.javafamily.familydemo.dao.db.PetsRepository;import com.javafamily.familydemo.dao.db2.Doctor;import com.javafamily.familydemo.dao.db2.DoctorRepository;import org.junit.jupiter.api.Test;import org.junit.jupiter.api.extension.ExtendWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit.jupiter.SpringExtension;import javax.annotation.Resource;import java.util.Date;@ExtendWith(SpringExtension.class)@SpringBootTestpublic class KeyWordsTest { @Resource private PetsRepository petsRepository; @Resource private DoctorRepository doctorRepository; @Test public void Test() { Pets pets = petsRepository.findPetsByName("fish"); System.out.println(pets); } @Test public void jpaTest() { Pets pets = Pets.builder() .id(2L) .name("JavaFamily") .varieties("chai") .createTime(new Date()) .build(); Doctor doctor = Doctor.builder() .name("petsDoctor").title("director").build(); // 先构造一个Pets对象pets,这个操作针对db petsRepository.save(pets); //在构造一个Doctor对象doctor,这个操作针对db2 doctorRepository.save(doctor); } }1234567891011121314151617181920212223242526272829303132333435363738394041424344454647复制代码类型:[java]
执行代码,查看数据库会发现两张表的数据都已经插入成功。
当有异常发生时按照正常的逻辑,两条数据都应该插入失败。
我们先将数据库中之前的数据全部清除,之后改写PetsServiceImpl.java的save方法,并且添加分母为0的异常。
@Resource private PetsRepository petsRepository; @Resource private DoctorRepository doctorRepository; @Resource private Mapper dozerMapper; @Transactional public void savePets(PetsVO pets) { Pets petsPO = dozerMapper.map(pets, Pets.class); // 通过insert,保存一个对象 petsRepository.save(petsPO); doctorRepository.save(new Doctor(3,"Family2","doctor")); int num = 1/0; }1234567891011121314151617复制代码类型:[java]
执行代码,在postman中添加请求:
得到报错后,查看数据库。
第一个数据库没有数据传入。
第二个数据库有数据传入。
这是因为数据库事物不能跨链接,数据源更不能跨库。如果出现了上述操作那这个事务就变成了分布式事务,需要一个统一协调的管理器。下面让我们来实现JPA+atomikos实现分布式事务。
首先引入maven依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>1234复制代码类型:[java]
之后改写.yml文件:
spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: family: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/Family?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 family2: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/Family2?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 # jta表示分布式事物 jta: atomikos: datasource: # 数据池最大连接数 max-pool-size: 30 # 超出设定时间抛出异常 borrow-connection-timeout: 100 connectionfactory: max-pool-size: 30 borrow-connection-timeout: 10012345678910111213141516171819202122232425262728复制代码类型:[java]
注:将jdbc-url还原成url
在config文件下创建AtomikosJtaPlatform.java,这部分的代码为固定代码。
package com.javafamily.familydemo.config;import org.hibernate.engine.transaction.jta.platform.internal.AbstractJtaPlatform;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;public class AtomikosJtaPlatform extends AbstractJtaPlatform { private static final long serialVersionUID = 1L; static TransactionManager transactionManager; static UserTransaction transaction; @Override protected TransactionManager locateTransactionManager() { return transactionManager; } @Override protected UserTransaction locateUserTransaction() { return transaction; } }12345678910111213141516171819202122232425复制代码类型:[java]
再来进行事物管理器配置。在config文件夹下创建JPAAtomikosTransactionConfig.java:
package com.javafamily.familydemo.config;import com.atomikos.icatch.jta.UserTransactionImp;import com.atomikos.icatch.jta.UserTransactionManager;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;import org.springframework.orm.jpa.JpaVendorAdapter;import org.springframework.orm.jpa.vendor.Database;import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import org.springframework.transaction.jta.JtaTransactionManager;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;@Configuration@ComponentScan@EnableTransactionManagementpublic class JPAAtomikosTransactionConfig { @Bean public PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { return new PropertySourcesPlaceholderConfigurer(); } // JPA特性 @Bean public JpaVendorAdapter jpaVendorAdapter() { HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter(); hibernateJpaVendorAdapter.setShowSql(true); hibernateJpaVendorAdapter.setGenerateDdl(true); hibernateJpaVendorAdapter.setDatabase(Database.MYSQL); return hibernateJpaVendorAdapter; } @Bean(name = "userTransaction") public UserTransaction userTransaction() throws Throwable { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(5000); return userTransactionImp; } @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close") public TransactionManager atomikosTransactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); AtomikosJtaPlatform.transactionManager = userTransactionManager; return userTransactionManager; } @Bean(name = "transactionManager") @DependsOn({"userTransaction", "atomikosTransactionManager"}) public PlatformTransactionManager transactionManager() throws Throwable { UserTransaction userTransaction = userTransaction(); AtomikosJtaPlatform.transaction = userTransaction; TransactionManager atomikosTransactionManager = atomikosTransactionManager(); return new JtaTransactionManager(userTransaction, atomikosTransactionManager); } }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364复制代码类型:[java]
设置第一个数据库Family的数据源和实体扫描管理:
package com.javafamily.familydemo.config;import com.mysql.cj.jdbc.MysqlXADataSource;import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.context.annotation.Primary;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaVendorAdapter;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import javax.annotation.Resource;import javax.sql.DataSource;import java.sql.SQLException;import java.util.HashMap;@Configuration@DependsOn("transactionManager")@EnableJpaRepositories(basePackages = "com.javafamily.familydemo.dao.db", entityManagerFactoryRef = "familyEntityManager", transactionManagerRef = "transactionManager")public class JPAFamilyConfig { @Resource private JpaVendorAdapter jpaVendorAdapter; @Primary @Bean @ConfigurationProperties(prefix = "spring.datasource.family") public DataSourceProperties familyDataSourceProperties() { return new DataSourceProperties(); } @Primary @Bean @ConfigurationProperties(prefix = "spring.datasource.family") public DataSource familyDataSource() throws SQLException { MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(familyDataSourceProperties().getUrl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(familyDataSourceProperties().getPassword()); mysqlXaDataSource.setUser(familyDataSourceProperties().getUsername()); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("family"); xaDataSource.setBorrowConnectionTimeout(60); xaDataSource.setMaxPoolSize(20); return xaDataSource; } @Primary @Bean @DependsOn("transactionManager") public LocalContainerEntityManagerFactoryBean familyEntityManager() throws Throwable { HashMap<String, Object> properties = new HashMap<String, Object>(); properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName()); properties.put("javax.persistence.transactionType", "JTA"); LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean(); entityManager.setJtaDataSource(familyDataSource()); entityManager.setJpaVendorAdapter(jpaVendorAdapter); //这里要修改成主数据源的扫描包 entityManager.setPackagesToScan("com.javafamily.familydemo.dao.db"); entityManager.setPersistenceUnitName("familyPersistenceUnit"); entityManager.setJpaPropertyMap(properties); return entityManager; } }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970复制代码类型:[java]
设置第二个数据库Family的数据源和实体扫描管理:
package com.javafamily.familydemo.config;import com.mysql.cj.jdbc.MysqlXADataSource;import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaVendorAdapter;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import javax.annotation.Resource;import javax.sql.DataSource;import java.sql.SQLException;import java.util.HashMap;@Configuration@DependsOn("transactionManager")@EnableJpaRepositories(basePackages = "com.javafamily.familydemo.dao.db2", entityManagerFactoryRef = "family2EntityManager", transactionManagerRef = "transactionManager")public class JPAFamily2Config { @Resource private JpaVendorAdapter jpaVendorAdapter; @Bean @ConfigurationProperties(prefix = "spring.datasource.family2") //注意这里 public DataSourceProperties family2DataSourceProperties() { return new DataSourceProperties(); } // @Bean(name = "family2DataSource", initMethod = "init", destroyMethod = "close") @Bean @ConfigurationProperties(prefix = "spring.datasource.family2") public DataSource family2DataSource() throws SQLException { MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(family2DataSourceProperties().getUrl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(family2DataSourceProperties().getPassword()); mysqlXaDataSource.setUser(family2DataSourceProperties().getUsername()); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("family2"); xaDataSource.setBorrowConnectionTimeout(60); xaDataSource.setMaxPoolSize(20); return xaDataSource; } @Bean @DependsOn("transactionManager") public LocalContainerEntityManagerFactoryBean family2EntityManager() throws Throwable { HashMap<String, Object> properties = new HashMap<String, Object>(); properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName()); properties.put("javax.persistence.transactionType", "JTA"); LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean(); entityManager.setJtaDataSource(family2DataSource()); entityManager.setJpaVendorAdapter(jpaVendorAdapter); //这里要修改成主数据源的扫描包 entityManager.setPackagesToScan("com.javafamily.familydemo.dao.db2"); entityManager.setPersistenceUnitName("family2PersistenceUnit"); entityManager.setJpaPropertyMap(properties); return entityManager; } }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869复制代码类型:[java]
通过以上代码我们会发现,除了事物管理器只有一个以外,其他都是两个。不同的数据源通过同一个事物管理器实现了分布式事务。
这时我们再次执行代码,会发现报错之后两个数据库都没有数据库插入。
当把错误代码去除后(intnum=1/0;)再次执行,两个数据库的数据均已插入。