RSocket File Upload Example
RSocket File Upload:
In an application, when an one component , say A, is very slow in responding to a request, the request sender might want to slow down the rate of sending the requests to avoid further stress on the component under load. Otherwise the component A might crash / lose data etc. With Reactive programming / back-pressure support, the sender is aware of the component A’s rate of processing the requests and sends the request only when the component A is capable of processing. This also helps the sender not to do too much work from its side.
For ex: Lets assume there is a client which tries to upload 10 GB file. The client might have 100 GB RAM and might even hold the entire file in the memory and try to send the upload request to the server. But the poor server with 1 GB RAM might not be able to process the request or It might take significant amount of time to process such request and respond to the client that the request is complete / failed.
We also do not want to block the connection between client and server during this processing. A request might fail after 90% of upload for some reason. In that case the client might want to send the same request again to the server. If you see, there are many issues with this approach.
It is good if the server processes the request as few file chunks and write as and when it can as shown here. So that client knows the progress of the file upload. If a specific chunk is lost, the client can resend that chunk instead of sending the whole file. The client can also do other tasks if the server is very slow.
Let’s see how we can achieve this using rsocket + Spring Boot.
Sample Application:
Just create a simple Spring Boot application with RSocket dependency.
We will develop a simple client and server application which will do the following.
- The client will send a PDF to the server as stream of byte array with size of 4096.
- The server will write each chunk and respond back to the client with status.
- Once the client confirms that it has sent everything, The server also confirms that file is completely written on the server side.
- If something unexpected happens, the server will respond with Failed status.
Models:
- Status
- chunk_completed for individual chunks
- completed is for final upload
- failed if upload failed for some reason
public enum Status {
CHUNK_COMPLETED,
COMPLETED,
FAILED;
}
- Mime types & other parameters
public class Constants {
public static final String MIME_FILE_EXTENSION = "message/x.upload.file.extension";
public static final String MIME_FILE_NAME = "message/x.upload.file.name";
public static final String FILE_NAME = "file-name";
public static final String FILE_EXTN = "file-extn";
}
RSocket File Upload — Server Side:
- application.properties for the rscoket port
spring.rsocket.server.port=6565
- Controller
@Controller
public class FileUploadController {
@Autowired
private FileUploadService service;
@MessageMapping("file.upload")
public Flux<Status> upload(@Headers Map<String, Object> metadata, @Payload Flux<DataBuffer> content) throws IOException {
var fileName = metadata.get(Constants.FILE_NAME);
var fileExtn = metadata.get(Constants.FILE_EXTN);
var path = Paths.get(fileName + "." + fileExtn);
return Flux.concat(service.uploadFile(path, content), Mono.just(Status.COMPLETED))
.onErrorReturn(Status.FAILED);
}
}
- Service:
@Service
public class FileUploadService {
@Value("${output.file.path:src/test/resources/output}")
private Path outputPath;
public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
Path opPath = outputPath.resolve(path);
AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
return DataBufferUtils.write(bufferFlux, channel)
.map(b -> Status.CHUNK_COMPLETED);
}
}
Configuration:
@Configuration
public class RSocketConfig {
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.metadataExtractorRegistry(metadataExtractorRegistry -> {
metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_EXTENSION), String.class, Constants.FILE_EXTN);
metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_NAME), String.class, Constants.FILE_NAME);
})
.build();
}
@Bean
public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
return builder
.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.connect(TcpClientTransport.create(6565));
}
}
RSocket File Upload — Client Side:
@SpringBootTest
class FileUploadApplicationTests {
@Autowired
private Mono<RSocketRequester> rSocketRequester;
@Value("classpath:input/java_tutorial.pdf")
private Resource resource;
@Test
public void uploadFile() {
// read input file as 4096 chunks
Flux<DataBuffer> readFlux = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096)
.doOnNext(s -> System.out.println("Sent"));
// rsocket request
this.rSocketRequester
.map(r -> r.route("file.upload")
.metadata(metadataSpec -> {
metadataSpec.metadata("pdf", MimeType.valueOf(Constants.MIME_FILE_EXTENSION));
metadataSpec.metadata("output", MimeType.valueOf(Constants.MIME_FILE_NAME));
})
.data(readFlux)
)
.flatMapMany(r -> r.retrieveFlux(Status.class))
.doOnNext(s -> System.out.println("Upload Status : " + s))
.subscribe();
}
}
output
...
...
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Upload Status : COMPLETED
Lets simulate the slow server by slowing down the write on the server side. Lets assume each chunk takes 1 second. I added the delay element to simulate that.
public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
Path opPath = outputPath.resolve(path);
AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
return DataBufferUtils.write(bufferFlux.delayElements(Duration.ofSeconds(1)), channel)
.map(b -> Status.CHUNK_COMPLETED);
}
We will see a slightly different result this time. We will see 32 sent requests (chunks) first. (It is a default initial request which can be adjusted via reactor system property). As soon as client realizes that it has not received any response, it will not send any more request. Once it receives some responses, then it will send few more requests and wait for response. It will continue until the file upload process is 100% complete
Sent
Sent
...
...
Sent
Sent
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED
Sent
Sent
...
...
Sent
...
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED