avatarGayan Sanjeewa

Summary

The provided content outlines an RSocket-based file upload example using Spring Boot, which demonstrates efficient file chunking and back-pressure handling to optimize the upload process between a client and a server.

Abstract

The article discusses an RSocket file upload example that showcases the benefits of reactive programming and back-pressure in handling file uploads between a client and a server. It explains the problem of a slow server with limited resources trying to process a large file upload from a client with abundant resources. The solution proposed involves the client sending the file in 4096-byte chunks, allowing the server to process and respond to each chunk individually. This approach ensures that the client is aware of the server's processing rate and can handle errors or resend specific chunks if necessary. The example implementation uses Spring Boot with RSocket to create a server that can handle file uploads in chunks and a client that sends the file data. The server confirms the receipt of each chunk and the completion of the file write operation, while the client tests the upload process and handles the server's responses. The article also includes a simulation of a slow server by introducing a delay in writing each chunk to demonstrate the reactive system's adaptability to different server processing speeds.

Opinions

  • The article conveys that traditional file upload methods, where the entire file is sent in one go, are inefficient and can lead to server overload or crashes.
  • It emphasizes the importance of back-pressure in reactive systems to prevent the server from being overwhelmed by requests it cannot handle promptly.
  • The author suggests that by breaking the file into smaller chunks and processing them sequentially, the

RSocket File Upload Example

RSocket File Upload:

In an application, when an one component , say A, is very slow in responding to a request, the request sender might want to slow down the rate of sending the requests to avoid further stress on the component under load. Otherwise the component A might crash / lose data etc. With Reactive programming / back-pressure support, the sender is aware of the component A’s rate of processing the requests and sends the request only when the component A is capable of processing. This also helps the sender not to do too much work from its side.

For ex: Lets assume there is a client which tries to upload 10 GB file. The client might have 100 GB RAM and might even hold the entire file in the memory and try to send the upload request to the server. But the poor server with 1 GB RAM might not be able to process the request or It might take significant amount of time to process such request and respond to the client that the request is complete / failed.

We also do not want to block the connection between client and server during this processing. A request might fail after 90% of upload for some reason. In that case the client might want to send the same request again to the server. If you see, there are many issues with this approach.

It is good if the server processes the request as few file chunks and write as and when it can as shown here. So that client knows the progress of the file upload. If a specific chunk is lost, the client can resend that chunk instead of sending the whole file. The client can also do other tasks if the server is very slow.

Let’s see how we can achieve this using rsocket + Spring Boot.

Sample Application:

Just create a simple Spring Boot application with RSocket dependency.

We will develop a simple client and server application which will do the following.

  • The client will send a PDF to the server as stream of byte array with size of 4096.
  • The server will write each chunk and respond back to the client with status.
  • Once the client confirms that it has sent everything, The server also confirms that file is completely written on the server side.
  • If something unexpected happens, the server will respond with Failed status.

Models:

  • Status
  • chunk_completed for individual chunks
  • completed is for final upload
  • failed if upload failed for some reason
public enum Status {

    CHUNK_COMPLETED,
    COMPLETED,
    FAILED;

}
  • Mime types & other parameters
public class Constants {

    public static final String MIME_FILE_EXTENSION   = "message/x.upload.file.extension";
    public static final String MIME_FILE_NAME        = "message/x.upload.file.name";
    public static final String FILE_NAME = "file-name";
    public static final String FILE_EXTN = "file-extn";

}

RSocket File Upload — Server Side:

  • application.properties for the rscoket port
spring.rsocket.server.port=6565
  • Controller
@Controller
public class FileUploadController {

    @Autowired
    private FileUploadService service;

    @MessageMapping("file.upload")
    public Flux<Status> upload(@Headers Map<String, Object> metadata, @Payload Flux<DataBuffer> content) throws IOException {
        var fileName = metadata.get(Constants.FILE_NAME);
        var fileExtn = metadata.get(Constants.FILE_EXTN);
        var path = Paths.get(fileName + "." + fileExtn);
        return Flux.concat(service.uploadFile(path, content), Mono.just(Status.COMPLETED))
                    .onErrorReturn(Status.FAILED);

    }

}
  • Service:
@Service
public class FileUploadService {

    @Value("${output.file.path:src/test/resources/output}")
    private Path outputPath;

    public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
        Path opPath = outputPath.resolve(path);
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        return DataBufferUtils.write(bufferFlux, channel)
                            .map(b -> Status.CHUNK_COMPLETED);
    }

}

Configuration:

@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .metadataExtractorRegistry(metadataExtractorRegistry -> {
                    metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_EXTENSION), String.class, Constants.FILE_EXTN);
                    metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_NAME), String.class, Constants.FILE_NAME);
                })
                .build();
    }

    @Bean
    public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .connect(TcpClientTransport.create(6565));
    }

}

RSocket File Upload — Client Side:

@SpringBootTest
class FileUploadApplicationTests {

    @Autowired
    private Mono<RSocketRequester> rSocketRequester;

    @Value("classpath:input/java_tutorial.pdf")
    private Resource resource;

    @Test
    public void uploadFile()  {

        // read input file as 4096 chunks
        Flux<DataBuffer> readFlux = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096)
                .doOnNext(s -> System.out.println("Sent"));

        // rsocket request
        this.rSocketRequester
                .map(r -> r.route("file.upload")
                        .metadata(metadataSpec -> {
                            metadataSpec.metadata("pdf", MimeType.valueOf(Constants.MIME_FILE_EXTENSION));
                            metadataSpec.metadata("output", MimeType.valueOf(Constants.MIME_FILE_NAME));
                        })
                        .data(readFlux)
                )
                .flatMapMany(r -> r.retrieveFlux(Status.class))
                .doOnNext(s -> System.out.println("Upload Status : " + s))
                .subscribe();
        
    }

}

output

...
...
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Upload Status : COMPLETED

Lets simulate the slow server by slowing down the write on the server side. Lets assume each chunk takes 1 second. I added the delay element to simulate that.

public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
    Path opPath = outputPath.resolve(path);
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    return DataBufferUtils.write(bufferFlux.delayElements(Duration.ofSeconds(1)), channel)
                        .map(b -> Status.CHUNK_COMPLETED);
}

We will see a slightly different result this time. We will see 32 sent requests (chunks) first. (It is a default initial request which can be adjusted via reactor system property). As soon as client realizes that it has not received any response, it will not send any more request. Once it receives some responses, then it will send few more requests and wait for response. It will continue until the file upload process is 100% complete

Sent
Sent
...
...
Sent
Sent
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED
Sent
Sent
...
...
Sent
...
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED
Rsocket
Spring Boot
Websocket
Software Development
Recommended from ReadMedium