The Reactive Streams Ingestion (RSI) Library— DataLoad Mode

Juarez Junior
Oracle Developers
Published in
4 min readJan 19, 2024

--

High-performance data access with Java

by Juarez Junior

Introduction

Part 1 in this series introduced the Java Library for Reactive Stream Ingestion (RSI), its API, and Oracle Database Free as the target database.

Then, in Part 2 of this series, we explored another streaming scenario with RSI and the Oracle Autonomous Transaction Processing (ATP) Database — a fully automated database service optimized to run concurrent transactional workloads.

This blog post will cover the Reactive Streams Ingestion (RSI) library DataLoad mode, which provides another option concerning how the data ingestion will occur. The default ingestion mode in RSI is the Streaming mode. However, there’s now a new ingestion mode called DataLoad mode.

A code sample is included if you want to test it locally. Time to learn about its details and see it in action then, so let’s get started!

Reactive Streams Ingestion (RSI) Library- DataLoad Mode

Starting with the Oracle Database 23c Release, you can use the Java library for Reactive Streams Ingestion (RSI) in the following modes, depending on your business use case:

  • The Streaming mode: Use this default mode, when you want to use RSI in a server where the number of rows to be inserted is not finite, but you do not need to insert a large number of rows at one go.
  • The DataLoad mode: Use the DataLoad mode, when there is a known large list of records to be inserted into the database in one go.

So, in a nutshell, the key differences between the DataLoad mode and the Streaming mode are:

In the DataLoad mode, the changes are not committed until the RSI instance is closed. In the default Streaming mode, the changes are committed regularly. This can negatively impact the throughput, when you execute a large INSERT batch to the database.

In the DataLoad mode, each worker thread has its own JDBC connection. So, there is no effort to reduce the number of JDBC connections needed to execute the insertion task. This behaviour is different from the default Streaming mode, where the worker threads share a pool of JDBC connections.

Enabling the RSI DataLoad Mode

The Streaming mode is enabled by default with the Java library for Reactive Streams Ingestion. To enable the DataLoad mode, you must call the useDataLoadMode() method as shown below.

ReactiveStreamsIngestion.Builder rsiBuilder = ReactiveStreamsIngestion.builder()
.useDataLoadMode()
.username("<user_name>")
.password("<password>")
.url("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)
(HOST=myhost.com)(PORT=5521))(CONNECT_DATA=(SERVICE_NAME=myservice.com)))")
.table("customers")
.columns (new String[] { "id", "name", "region" });
// Use try-with-resource statement to ensure that RSI instance is closed at the
// end of the statement.
try (ReactiveStreamsIngestion rsi = rsiBuilder.build()){
// Publish Records.
}

To enable it, you have to use the right version of Oracle’s JDBC and RSI libraries as shown below.

  <!-- Oracle JDBC / RSI / UCP JARs -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>rsi</artifactId>
<version>23.3.0.23.09</version>
</dependency>

<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>23.3.0.23.09</version>
</dependency>

<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ucp11</artifactId>
<version>23.3.0.23.09</version>
</dependency>

Again, a complete code sample is provided for your reference, so you can give it a try if you want!

Run the application

Now we can run the application, and if you recall from the previous explanation above, now with the use of the DataLoad mode, the changes are not committed until the RSI instance is closed.

To observe the new behaviour, you can comment on the call to the rsi.close() method as shown below, and give it a try without it.

 try {
firstPublisher.close();
secondPublisher.close();
//rsi.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerThreadPool.shutdown();
}

The execution never completes with the comment above, and if you query the database you will see that no records get inserted at all.

RSI DataLoad mode — No explicit call to rsi.close()

So an explicit call is required to commit the changes as shown below. If you do not close it, you can run a query and no records will be inserted into the database yet. It happens only after closing the RSI instance indeed.

 try {
firstPublisher.close();
secondPublisher.close();
rsi.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerThreadPool.shutdown();
}

Now, we can query the database again and see that the records got inserted as expected.

RSI DataLoad mode — after an explicit call to rsi.close()

Wrapping it up

This blog post explored the Java library for Reactive Streams Ingestion (RSI) in a scenario where the RSI DataLoad mode is enabled.

In the DataLoad mode, each worker thread has its own JDBC connection. So, there is no effort to reduce the number of JDBC connections needed to execute the insertion task. This behaviour is different from the default Streaming mode, where the worker threads share a pool of JDBC connections.

I hope you liked this blog post. Stay tuned!

References

The Java library for Reactive Streams Ingestion (RSI)

Develop Java applications with Oracle Database

Developers Guide For Oracle JDBC on Maven Central

Oracle Developers and Oracle OCI Free Tier

Join our Oracle Developers channel on Slack to discuss Java, JDK, JDBC, GraalVM, Microservices with Spring Boot, Helidon, Quarkus, Micronaut, Reactive Streams, Cloud, DevOps, IaC, and other topics!

Build, test, and deploy your applications on Oracle Cloud — for free! Get access to OCI Cloud Free Tier!

--

--