网站团队组成,西安网络公司大全,2021年最新的网站,虚拟网站php专业型1.概述 众所周知#xff0c; Spring Integration具有用于与外部系统交互的大量连接器。 Twitter也不例外#xff0c;而且很长一段时间以来#xff0c;因为Spring Social一直是一个开箱即用的解决方案#xff0c;Spring Integration利用该解决方案来连接到社交网络。 1.1Sp… 1.概述 众所周知 Spring Integration具有用于与外部系统交互的大量连接器。 Twitter也不例外而且很长一段时间以来因为Spring Social一直是一个开箱即用的解决方案Spring Integration利用该解决方案来连接到社交网络。 1.1Spring社交EOL 不幸的是 Spring Social已经到了使用寿命 该项目现在处于维护模式。 Spring团队决定不进一步开发Spring Social的原因是使API绑定与社交网络的API保持同步变得很繁琐。 除此之外Spring Framework 5发布后开发人员希望利用其响应式编程模型这将要求团队在现有的响应式社交绑定旁边重新实现一个响应式Spring Social绑定。 现在建议开发人员实现自己的绑定或使用专用库之一连接社交网络。 1.2 Spring Integration的Twitter模块已移至扩展 Spring Social现在处于维护模式这迫使Spring Integration团队将Twitter支持模块从主项目移至扩展。 由于Spring Social不会接收更新因此它将基于早期的Spring Framework版本构建。 这将导致类路径冲突也将阻碍Spring Integration的开发。 因此 从Spring Integration 5.1开始Twitter模块可作为扩展使用 。 1.3有哪些替代方案 Twitter4J是Yamas Yusuke开发和维护的Twitter API的非官方Java库。 官方的HBC库由Twitter构建是一个Java HTTP Client用于使用Twitter的Streaming API。 自2016年以来后者从未见过重大更新而Twitter4J正在定期更新。 也可以选择实现自己的API绑定。 在使用RestTemplate的基于Spring的项目中绝对是一个选择并且这是进行REST调用的简便方法。 本指南以流模式使用Twitter4J可以将其集成到Spring Integration消息流中。 1.4 Twitter流如何工作 简而言之 您的应用打开了一个与Twitter API的单一连接只要发生新匹配就会通过该连接发送新结果 。 相反另一种方法是通过向REST API重复发送请求来批量传送数据。 流提供了一种低延迟的传递机制 该机制可以支持非常高的吞吐量而不必处理速率限制。 2.示例项目 该示例项目展示了Twitter的Streaming API到Spring Integration消息流的集成可在GitHub上找到 https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。 Maven依赖 由于Spring Social现在是EOL因此我们不会在此基础上继续发展。 我们引入的只是spring-integration-core和twitter4j-stream 。 dependenciesdependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-core/artifactId/dependencydependencygroupIdorg.twitter4j/groupIdartifactIdtwitter4j-stream/artifactIdversion4.0.1/version/dependency/dependencies 该项目还使用了Lombok和Spring Boot测试支持但是这些是可选的。 Spring Integration的可听消息源 Spring Integration提供了对实现入站消息组件的支持。 它们分为轮询和监听行为 。 最初依赖于Inbound Twitter Channel Adapter建立在Spring Social之上现在已移至扩展 它是轮询用户 。 也就是说您必须提供一个轮询器配置才能使用它。 另一方面Twitter实施速率限制以管理应用程序获取更新的频率。 使用旧的Twitter Channel适配器时您应该考虑速率限制以便您配置的轮询间隔符合Twitter策略。 另一方面 侦听入站组件更简单通常只需要实现MessageProducerSupport 。 这样的侦听组件看起来像这样。 public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}Overridepublic void doStart() {// Lifecycle method for starting receiving messages}Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}} 只有两个必需的元素 必须定义输出消息通道 每当组件收到消息时都必须调用sendMessage 可选您可能希望控制组件的初始化并管理其生命周期。 由于Twitter的Streaming API本质上是消息驱动的因此监听行为自然很合适。 让我们看看如何在这样的上下文中合并Twitter4J。 使用Twitter4J连接到Twitter Streaming API Twitter4J管理连接处理的细微差别并从Twitter的Streaming API接收更新。 我们需要做的就是获取一个TwitterStream实例附加一个侦听器并定义过滤。 实例化 Twitter4J网站上的流示例表明应通过TwitterStreamFactory创建一个TwitterStream实例。 这完全有道理但是在Spring应用程序上下文中我们希望它成为托管bean。 Spring的FactoryBean工具是包含创建单例TwitterStream实例的详细信息的简单FactoryBean方法。 public class TwitterStreamFactory extends AbstractFactoryBeanTwitterStream {Overridepublic Class? getObjectType() {return TwitterStream.class;}Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}} 尽管我们也可以将其公开为普通的bean而不用由FactoryBean创建FactoryBean 但这不会适当地将其关闭。 附加侦听器并定义过滤 这将是我们自定义MessageProducer实现的责任。 Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private ListLong follows;private ListString terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream twitterStream;setOutputChannel(outputChannel);}Overrideprotected void onInit() {super.onInit();statusListener new StatusListener();long[] followsArray null;if (!CollectionUtils.isEmpty(follows)) {followsArray new long[follows.size()];for (int i 0; i follows.size(); i) {followsArray[i] follows.get(i);}}String[] termsArray null;if (!CollectionUtils.isEmpty(terms)) {termsArray terms.toArray(new String[0]);}filterQuery new FilterQuery(0, followsArray, termsArray);}Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(ListLong follows) {this.follows follows;}public void setTerms(ListString terms) {this.terms terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}}
} MessageProducerSupport和TwitterStream的管理界面提供的生命周期方法可以很好地配合使用。 这也将使我们能够在需要时在运行时停止和启动组件。 Java配置 尽管Spring可以自动装配组件但我还是更喜欢通过手动配置来控制依赖关系。 Slf4j
Configuration
public class TwitterConfig {BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList(java, microservices, spring));return twitterMessageProducer;}BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m - log.info(m.getPayload().toString())).get();}} 这里的重要部分是我们的自定义消息生成器如何与消息流集成。 基本上除了在生产者的输出通道中列出消息之外我们不需要执行任何其他操作。 测试中 只有Chuck Norris在生产中测试代码。 但是像您和我这样的普通凡人我们确实会编写测试用例。 RunWith(SpringRunner.class)
ContextConfiguration(classes TestConfig.class)
public class TwitterMessageProducerTest {MockBeanprivate TwitterStream twitterStream;Autowiredprivate PollableChannel outputChannel;Autowiredprivate TwitterMessageProducer twitterMessageProducer;Testpublic void shouldBeInitialized() {StatusListener statusListener twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}Testpublic void shouldReceiveStatus() {StatusListener statusListener twitterMessageProducer.getStatusListener();Status status mock(Status.class);statusListener.onStatus(status);Message? statusMessage outputChannel.receive();assertSame(status, statusMessage.getPayload());}Import(TwitterConfig.class)static class TestConfig {BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}} 我喜欢Twitter4J的设计因为它利用了界面。 该库的大多数重要部分都作为普通接口公开。 TwitterStream也不例外。 也就是说在测试用例中可以轻松地将其嘲笑。 六结论 Spring Social现在已经停产了 -它不会收到新功能 Spring Integration的Twitter模块可作为扩展使用 -已从主项目中移出。 Twitter入站通道适配器是一个轮询用户 –选择轮询间隔时必须处理速率限制 Twitter的Streaming API符合入站通道适配器的监听行为 翻译自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html