Spring Integration中的REST端点使消息传递通道成为多线程
我有一个非常简单的Spring
Boot应用程序,该应用程序提供了几个宁静的终结点,并且应该可以驱动将sftp文件上传到sftp服务器。我的要求是,如果有多个文件,则文件应排队。我希望通过sftp
spring集成工作流的默认行为来实现这一目标,因为我读到DirectChannel自动将文件排队。要测试该行为,请执行以下操作:
- 发送一个大文件,通过调用端点来阻塞通道一段时间。
- 通过调用端点来发送较小的文件。
预期结果:较小的文件将排队到通道上,并在较大文件的上传完成后进行处理。实际结果:打开与sftp服务器的新连接,较小的文件上传到那里而无需排队,而较大的文件继续传输。
我的应用程序中有两个文件:
DemoApplication.java
@SpringBootApplication@IntegrationComponentScan
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("tester");
factory.setPassword("password");
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
DemoController.java
@RestControllerpublic class DemoController {
@Autowired
MyGateway gateway;
@RequestMapping("/sendFile")
public void sendFile() {
File file = new File("C:/smallFile.txt");
gateway.sendToSftp(file);
}
@RequestMapping("/sendBigFile")
public void sendBigFile() {
File file = new File("D:/bigFile.zip");
gateway.sendToSftp(file);
}
}
我是一个完全的新手,我不能完全确定我的sftp通道是否在这里正确创建,我的猜测是每次我执行sendToSftp调用时都会创建一个新的sftp通道。在这种情况下,如何实现队列行为的任何帮助将不胜感激。
回答:
您在这里没有队列,因为每个HTTP请求都是在其自己的线程中执行的。正确,http线程池用尽时,您可能仍然在那儿排队,但是在您只有两个请求的简单用例中,这似乎并不存在。
无论如何,您都可以在那里实现队列行为,但是您应该将自己声明toSftpChannel
为QueueChannel
Bean。
这样,下游进程将始终在同一线程上执行,并且下一个消息恰好在第一个消息之后从队列中拉出。
有关更多信息,请参见参考手册。
由于您使用的FtpMessageHandler
是单向组件,但是您仍然需要对MVC控制器的方法进行一些答复,因此,唯一的方法就是拥有一个不返回的@Gateway
方法void
,当然,我们需要以某种方式发送答复。
为此,我建议使用PublishSubscribeChannel
:
@Bean@BridgeTo
public MessageChannel toSftpChannel() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
这样,我们就有两个订阅者toSftpChannel
。使用,@Order(0)
我们确保@ServiceActivator
是第一个订户,因为我们需要首先执行SFTP传输。用,@BridgeTo
我们BridgeHandler
在相同的位置增加一秒钟PublishSubscribeChannel
。它的目的只是获取replyChannel
标头,然后在其中发送请求消息。由于我们不使用任何线程,因此BridgeHandler
将在完成向SFTP的传输后立即执行。
当然,BridgeHandler
除了您之外,您还可以拥有其他任何东西,@ServiceActivator
或者@Transfromer
返回其他信息而不是请求File
。例如:
@ServiceActivator(inputChannel = "toSftpChannel")@Order(1)
public String transferComplete(File payload) {
return "The SFTP transfer complete for file: " + payload;
}
以上是 Spring Integration中的REST端点使消息传递通道成为多线程 的全部内容, 来源链接: utcz.com/qa/425028.html