avatarRuchira Madhushan Rajapaksha

Summary

The web content provides a technical guide on how to asynchronously stream data of unknown size to Amazon S3 using reactive streams and the Amazon S3 Transfer Manager.

Abstract

The article "How can we stream data to Amazon S3 asynchronously?" details the process of streaming data with an unknown size to an Amazon S3 bucket using reactive streams. It explains the concept of streaming data and introduces the Amazon S3 Transfer Manager, which supports asynchronous data streaming through the AsyncRequestBody interface. The article covers the technical setup required for the project, including the necessary dependencies for a Spring Boot application using Java 21 and the AWS SDK for Java. It also provides code examples for streaming data as Flux<ByteBuffer> and as an OutputStream, demonstrating how to implement the fromPublisher method of the AsyncRequestBody interface and how to use the forBlockingOutputStream method for blocking writes. Additionally, the article illustrates how to use a LoggingTransferListener to monitor the transfer progress and provides visualizations of the streaming process in the AWS S3 bucket.

Opinions

  • The author emphasizes the importance of handling streaming data efficiently, especially when the size of the data is unknown.
  • The use of Amazon S3 Transfer Manager is recommended for its ability to support non-blocking streaming of data and automatic retries.
  • The article suggests that using reactive streams for data streaming allows for low-latency processing and is suitable for various scenarios, including log generation, customer interaction data, stock market data, and IoT device data.
  • The author provides a positive view of the AWS SDK for Java 2.x, highlighting its features for asynchronous and parallel file transfers.
  • The inclusion of a LoggingTransferListener implies that the author values detailed monitoring and logging of transfer events to track the progress and performance of data streaming.
  • The article encourages readers to engage with the content and provide feedback, indicating the author's interest in community interaction and continuous improvement of the shared knowledge.

How can we stream data to Amazon S3 asynchronously?

Asynchronous Realtime Streaming of Data Using Reactive Streams for Amazon S3 using Amazon S3 Transfer Manager

Asynchronous Streaming of Data to Amazon S3

Do you wish to stream your data of unknown size to Amazon S3? Do you find it challenging to stream data of unknown size to Amazon S3?

Then this article will explain how to stream data with an unknown size using reactive streams to an Amazon S3 bucket. We will be utilizing Amazon S3 Transfer Manager to implement this reactive approach.

What is streaming data?

Streaming data is emitted at high volume continuously and incrementally with the goal of low-latency processing.

Some examples of streaming data are as follows:

  • A log generator.
  • Customer interaction data from a web application or a mobile application.
  • Stock market data.
  • Data from scientific sensors [IOT device data]

Streaming Data With Amazon S3 Transfer Manager

Amazon S3 Transfer Manager supports uploading an asynchronous request body to an object in S3. TheAsyncRequestBody interface provides the capability to support non-blocking streaming of data, supporting multiple sources.

This approach follows the pattern of the reactive stream, where this interface is the Publisher of data (specifically ByteBuffer chunks) and the HTTP client is the Subscriber of the data (i.e. to write that data on the wire).

subscribe(Subscriber) should be implemented to tie this publisher to a subscriber. Ideally, each call to subscribe should reproduce the content.

If you are producing data from a ByteBufferpublisher, the data is delivered when the publisher publishes the data, and each call to subscribe should read the delivered data. This allows for automatic retries to be performed in the SDK. If the content is not reproducible, an exception may be thrown for any subsequent subscribe(Subscriber) calls.

public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {

    static AsyncRequestBody fromPublisher(Publisher<ByteBuffer> publisher) {
        return new AsyncRequestBody() {

            @Override
            public Optional<Long> contentLength() {
                return Optional.empty();
            }

            @Override
            public void subscribe(Subscriber<? super ByteBuffer> s) {
                publisher.subscribe(s);
            }
        };
    }

    static AsyncRequestBody fromFile(Path path) {
        return FileAsyncRequestBody.builder().path(path).build();
    }

    static AsyncRequestBody fromString(String string, Charset cs) {
        return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
                                                string.getBytes(cs));
    }

    static AsyncRequestBody fromBytes(byte[] bytes) {
        byte[] clonedBytes = bytes.clone();
        return ByteBuffersAsyncRequestBody.from(clonedBytes);
    }

    static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
        ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
                                           .map(BinaryUtils::immutableCopyOf)
                                           .peek(ByteBuffer::rewind)
                                           .toArray(ByteBuffer[]::new);
        return ByteBuffersAsyncRequestBody.of(immutableCopy);
    }

    static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) {
        return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength).executor(executor));
    }

    static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLength) {
        return new BlockingInputStreamAsyncRequestBody(contentLength);
    }

    static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) {
        return new BlockingOutputStreamAsyncRequestBody(contentLength);
    }

To understand how this works, we should explore how the Amazon S3 Transfer Manager works and what features it provides to achieve reactive streaming. To get a glimpse of Amazon S3 Transfer Manager and understand how it works, go through the below article on Amazon S3 Transfer Manager.

Project Technical Setup

  1. Spring Boot 3.2.2
  2. Java 21

Our project will include the following dependencies in the pom.xml file:

<properties>
  <java.version>21</java.version>
  <amazon.awssdk.version>2.23.14</amazon.awssdk.version>
  <amazon.awssdk.crt.version>0.29.9</amazon.awssdk.crt.version>
</properties>

<dependencies>
   <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>s3</artifactId>
      <version>${amazon.awssdk.version}</version>
   </dependency>

   <dependency>
      <groupId>software.amazon.awssdk.crt</groupId>
      <artifactId>aws-crt</artifactId>
      <version>${amazon.awssdk.crt.version}</version>
   </dependency>

   <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>s3-transfer-manager</artifactId>
       <version>${amazon.awssdk.version}</version>
   </dependency>

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
   </dependency>
</dependencies>

Streaming Data as Flux Of ByetBuffers

We will be utilizing the fromPublisher method of the AsyncRequestBody interface for this implementation.

public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {

    static AsyncRequestBody fromPublisher(Publisher<ByteBuffer> publisher) {
        return new AsyncRequestBody() {
            @Override
            public Optional<Long> contentLength() {
                return Optional.empty();
            }
            @Override
            public void subscribe(Subscriber<? super ByteBuffer> s) {
                publisher.subscribe(s);
            }
        };
    }
}

Consumer Object of FluxSink<T> used for Publishing

public class FluxSinkImplementation<T> implements Consumer<FluxSink<T>> {
    private FluxSink<T> fluxSink;
    @Getter
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    @Override
    public void accept(FluxSink<T> integerFluxSink) {
        this.fluxSink = integerFluxSink;
        countDownLatch.countDown();
    }

    public void publishEvent(T event){
        this.fluxSink.next(event);
    }

    public void complete() {
        fluxSink.complete();
    }
}

Upload-Stream Service

The service layer triggers the streaming of the Flux<ByteBuffer> to S3 bucket.

@RequiredArgsConstructor
@Slf4j
@Service
public class UploadStreamService{

    private final AwsProperties s3ConfigProperties;
    private final S3TransferManager transferManager;
    private final String S3_KEY = "stream-directory/Flux.txt";
    private final INPUT_STRING = "This is Test Data that is Published to Amazon S3 in a loop over again and again."


     @Override
    public void uploadStream() throws InterruptedException{

        final FluxSinkImplementation<byte[]> fluxSinkImplementation = new FluxSinkImplementation<>();

        final Flux<ByteBuffer> streamFlux = Flux
                .create(fluxSinkImplementation)
                .map(ByteBuffer::wrap)
                .doOnComplete(() -> log.info("Completed"));

        final PutObjectRequest putObjectRequest = PutObjectRequest.builder()
                .bucket(s3ConfigProperties.getS3BucketName())
                .key(S3_KEY)
                .build();

        final UploadRequest uploadRequest = UploadRequest.builder()
                .putObjectRequest(putObjectRequest)
                .requestBody(AsyncRequestBody.fromPublisher(streamFlux))
                .addTransferListener(new LoggingTransferListener(314_572_800L)) //30MB
                .build();

        final Upload upload = transferManager.upload(uploadRequest);

        fluxSinkImplementation.getCountDownLatch().await();

        for (int i = 0; i < 10_000_00; i++) {
            fluxSinkImplementation.publishEvent(INPUT_STRING.getBytes(StandardCharsets.UTF_8));
        }
        fluxSinkImplementation.complete();

        Mono.fromFuture(upload.completionFuture()).subscribe(completedUpload -> {
            log.info("Data Streamed Successfully");
            log.info("Entity Tag of S3 Object Streamed: {}", completedUpload.response().eTag());
        });
    }
}

Logging Transfer Listener

This listener is responsible for logging event-driven updates on the progress of the transfer initiated by the S3 Transfer Manager. Throughout the lifecycle of the request, S3TransferManager will invoke the provided TransferListeners when important events occur, like additional bytes being transferred, allowing you to monitor the ongoing progress of the transfer.

@Slf4j
@RequiredArgsConstructor
public class LoggingTransferListener implements TransferListener {

    private final long byteSize;
    private long startTime;
    private int iteration=0;

    @Override
    public void transferInitiated (Context.TransferInitiated context) {
        log.info("Transfer initiated: {}", context.progressSnapshot().ratioTransferred());
        startTime = System.currentTimeMillis();
        logTransferStatus(context.progressSnapshot().transferredBytes());
    }

    private void logTransferStatus(long l) {
        if (l > iteration * byteSize) {
            log.info("Bytes transferred: {} ", l);
            iteration++;
        }
    }

    @Override
    public void bytesTransferred (Context.BytesTransferred context) {
        logTransferStatus(context.progressSnapshot().transferredBytes());
    }

    @Override
    public void transferComplete (Context.TransferComplete context) {
        long seconds = (System.currentTimeMillis() - startTime) / 1000;
        double bytes = (double)context.progressSnapshot().transferredBytes();
        double megabytes = bytes / 1_048_576;
        double throughput = megabytes / seconds;
        log.info("Transfer complete: \n\t Bytes Transferred: {}\n\t MBs: {}\n\t Total Time Taken: {} Seconds\n\t Throughput: {} MB/s",
                String.format("%10f", bytes), String.format("%.3f", megabytes),
                seconds,
                String.format("%.2f", throughput));
    }

    @Override
    public void transferFailed (Context.TransferFailed context) {
        log.error("Transfer failed", context.exception());
    }
}

Streaming Data as an OutputStream

We will utilize the forBlockingOutputStream method of the AsyncRequestBody interface for performing blocking writes to the downstream service as if it’s an output stream. Retries are not supported for this request body.

The caller is responsible for calling OutputStream.close() on the BlockingOutputStreamAsyncRequestBody.outputStream() when writing is complete.

public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {

    static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) {
        return new BlockingOutputStreamAsyncRequestBody(contentLength);
    }
}

Upload-Stream Service

@RequiredArgsConstructor
@Slf4j
@Service
public class UploadStreamService{

    private final AwsProperties s3ConfigProperties;
    private final S3TransferManager transferManager;
    private final String S3_KEY = "stream-directory/Flux.tar.gz";
    private final INPUT_STRING = "This is Test Data that is Published to Amazon S3 in a loop over again and again."

    @Override
    public void uploadStream() throws IOException {


        final PutObjectRequest putObjectRequest = PutObjectRequest.builder()
                .bucket(s3ConfigProperties.getS3BucketName())
                .key(S3_KEY)
                .build();

        final BlockingOutputStreamAsyncRequestBody blockingOutputStreamAsyncRequestBody = AsyncRequestBody.forBlockingOutputStream(null);


        UploadRequest uploadRequest = UploadRequest.builder()
                .putObjectRequest(putObjectRequest)
                .requestBody(blockingOutputStreamAsyncRequestBody)
                .addTransferListener(new LoggingTransferListener(200L))
                .build();

        final Upload upload = transferManager.upload(uploadRequest);
        final GZIPOutputStream outputStream = new GZIPOutputStream(blockingOutputStreamAsyncRequestBody.outputStream());
        
        Mono.fromRunnable(()-> {
            for (int i = 0; i < 10_000_000; i++) {
                try {
                    outputStream.write(INPUT_STRING.getBytes(StandardCharsets.UTF_8));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            try {
                outputStream.flush();
                outputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

            Mono.fromFuture(upload.completionFuture()).subscribe(completedUpload -> {
                log.info("Data Streamed Successfully");
                log.info("Entity Tag of S3 Object Streamed: {}", completedUpload.response().eTag());
            });
        }).subscribeOn(Schedulers.boundedElastic()).subscribe();
    }
}

Visualizing Streaming Data using Transfer Manager

Visualizing Streaming Data as Flux Of ByetBuffers

We will stream as follows:

S3-Directory-Path and Key : stream-directory/Flux.txt

Console Logs about Streaming Data as Flux<ByteBuffer>
AWS S3 Bucket After Streaming Data as Flux<ByteBuffer>

Visualizing Streaming Data as OutputStream

We will stream as follows:

S3-Directory-Path and Key : stream-directory/Flux.tar.gz

Console Logs about Streaming Data as OutputStream
AWS S3 Bucket After Streaming Data as OutputStream

Thank You for Reading

  • Please feel free to share your feedback.
  • Stay connected for more insightful content.
Aws S3
Aws Sdk 2
Streaming
Reactive Programming
Spring Boot
Recommended from ReadMedium