销售机械设备做网站,服装定制网站的设计与实现,完美代码网站,网络营销师报名官网spring jpa 流式这篇文章详细介绍了从数据库到对该数据感兴趣的任何其他组件进行流更新的幼稚实现。 更准确地说#xff0c;如何更改Spring Data R2DBC存储库以向相关订阅者发出事件。 对R2DBC和Spring的一点背景知识将对这篇文章有所帮助。 我以前的著作《 使用 Microsoft S… spring jpa 流式 这篇文章详细介绍了从数据库到对该数据感兴趣的任何其他组件进行流更新的幼稚实现。 更准确地说如何更改Spring Data R2DBC存储库以向相关订阅者发出事件。 对R2DBC和Spring的一点背景知识将对这篇文章有所帮助。 我以前的著作《 使用 Microsoft SQL Server的 Spring Data R2DBC和Spring Data R2DBC进行 异步RDBMS访问》在这方面应该有所帮助。 如前所述这将是一个幼稚的实现。 因此代码将不会花哨。 为此我劫持了SimpleR2dbcRepository以创建一个存储库实现该存储库实现在每次保存新记录时都会发出事件。 新事件将添加到DirectProcessor 并发送到订阅它的任何Publisher 。 看起来像 class PersonRepository(entity: RelationalEntityInformationPerson, Int,databaseClient: DatabaseClient,converter: R2dbcConverter,accessStrategy: ReactiveDataAccessStrategy
) : SimpleR2dbcRepositoryPerson, Int(entity, databaseClient, converter, accessStrategy) {private val source: DirectProcessorPerson DirectProcessor.createPerson()val events: FluxPerson sourceoverride fun S : Person save(objectToSave: S): MonoS {return super.save(objectToSave).doOnNext(source::onNext)}
} 来自SimpleR2dbcRepository唯一需要重写的函数是save saveAll委托来save 。 doOnNext添加到原始保存调用中该调用通过调用onNext将新事件推送到source DirectorProcessor 。 source被强制转换为Flux以防止来自存储库外部的类添加新事件。 从技术上讲他们仍然可以添加事件但是他们需要自己进行转换。 您可能已经注意到存储库正在加载参数并将其传递到SimpleR2dbcRepository 。 存储库的一个实例需要手动创建因为它的某些依赖项无法自动注入 Configuration
class RepositoryConfiguration {Beanfun personRepository(databaseClient: DatabaseClient,dataAccessStrategy: ReactiveDataAccessStrategy): PersonRepository {val entity: RelationalPersistentEntityPerson dataAccessStrategy.converter.mappingContext.getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntityPersonval relationEntityInformation: MappingRelationalEntityInformationPerson, Int MappingRelationalEntityInformation(entity, Int::class.java)return PersonRepository(relationEntityInformation,databaseClient,dataAccessStrategy.converter,dataAccessStrategy)}
} 至此所有内容都已设置好并可以使用。 以下是其工作的示例 personRepository.events.doOnComplete { log.info(Events flux has closed) }.subscribe { log.info(From events stream - $it) }
// insert people records over time
MARVEL_CHARACTERS.toFlux().delayElements(Duration.of(1, SECONDS)).concatMap { personRepository.save(it) }.subscribe() 哪个输出 29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id481, nameSpiderman, age18)
29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id482, nameIronman, age48)
29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id483, nameThor, age1000)
29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id484, nameHulk, age49)
29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id485, nameAntman, age49)
29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id486, nameBlackwidow, age34)
29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id487, nameStarlord, age38)
29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id488, nameCaptain America, age100)
29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id489, nameWarmachine, age50)
29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id490, nameWasp, age26)
29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id491, nameWinter Soldier, age101)
29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id492, nameBlack Panther, age42)
29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id493, nameDoctor Strange, age42)
29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id494, nameGamora, age29)
29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id495, nameGroot, age4)
29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id496, nameHawkeye, age47)
29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id497, namePepper Potts, age44)
29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id498, nameCaptain Marvel, age59)
29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id499, nameRocket Raccoon, age30)
29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id500, nameDrax, age49)
29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id501, nameNebula, age30) 每秒保存一条记录该记录与从存储库发出的事件相匹配。 请注意 doOnComplete事件永远不会触发。 源永远不会关闭因此永远不会向其任何订户发出完成事件。 至少在此基本实现中这就是全部。 我敢肯定还有很多事情可以做但是我首先需要弄清楚该怎么做……总而言之加上一些补充您可以将插入数据库的数据流式传输到对记录感兴趣的组件被添加。 翻译自: https://www.javacodegeeks.com/2019/09/streaming-live-updates-reactive-spring-data-repository.htmlspring jpa 流式