I have a Java application that connects to Oracle Advanced Queue (AQ) and listens for various event types. The Java server and the Oracle server are on different machines but within the same data center network. Despite a very low event generation rate, we frequently observe delays of around 10 seconds between enqueueing an event and receiving it in Java.
To measure this, we:
- Insert a log entry with a timestamp into a table just before pushing the event onto the AQ queue.
- Log another timestamp in our Java MessageListener as soon as the message arrives.
The difference between these two timestamps is about 10 seconds.
- Is it possible to enable loging in a drivers that we use in Java service? (see used dependencies below)
- Do we need to adjust our connection to Oracle AQ? (see connection implementation below)
- Is it possible that Oracle AQ holds events from time to time?
Dependencies
<dependencies>
<dependency>
<groupId>com.oracle.database.messaging</groupId>
<artifactId>aqapi-jakarta</artifactId>
<version>23.3.1.0</version>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
Connection Factory
@Component
public class QueueConn {
private static final Logger LOGGER = LoggerFactory.getLogger(QueueConn.class);
@Override
public QueueConnectionWrapper createQueueConnectionWrapper()
throws JMSException, SQLException {
Connection dbConn = getConnection();
try {
QueueConnection aqConn =
AQjmsQueueConnectionFactory.createQueueConnection(dbConn);
return new QueueConnectionWrapper(dbConn, aqConn);
} catch (JMSException | RuntimeException e) {
close(dbConn);
if (e instanceof JMSException) throw (JMSException) e;
throw new SQLException("Failed to create AQ queue connection", e);
}
}
@Override
public void close(Connection c) {
if (c != null) {
try {
c.close();
} catch (SQLException e) {
LOGGER.error("Failed to close DB connection", e);
}
}
}
@Override
public QueueSession createQueueSession(QueueConnection queueConnection)
throws JMSException {
return queueConnection.createQueueSession(
false, Session.CLIENT_ACKNOWLEDGE);
}
@Override
public MessageConsumer createMessageConsumer(
String queueName,
QueueSession queueSession,
String schemaName
) throws JMSException, SQLException {
Queue queue = ((AQjmsSession) queueSession)
.getQueue(schemaName, queueName);
return queueSession.createConsumer(queue);
}
private Connection getConnection() throws SQLException {
DatabaseConfig cfg = /* load config */;
PoolDataSource pds = PoolDataSourceFactory.getPoolDataSource();
pds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
pds.setURL(cfg.connectionString());
pds.setUser(cfg.username());
pds.setPassword(cfg.password());
pds.setValidateConnectionOnBorrow(cfg.validateConnectionOnBorrow());
pds.setInitialPoolSize(cfg.initialPoolSize());
pds.setMaxPoolSize(cfg.maxPoolSize());
pds.setMaxIdleTime(cfg.maxIdleTime());
pds.setSecondsToTrustIdleConnection(cfg.secondsToTrustIdleConnection());
return pds.getConnection();
}
}
Listener Implementation
public class Listener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
@Override
public void onMessage(Message message) {
// log timestamp and process message
}
}
The queue in oracle was created using
begin
dbms_aqadm.create_queue_table (
Queue_table => 'my_event_jms_map_table',
Queue_payload_type => 'SYS.AQ$_JMS_MAP_MESSAGE');
end;
/
begin
dbms_aqadm.create_queue(
Queue_name => 'my_event_jms_map_queue',
Queue_table => 'my_event_jms_map_table');
end;
We have no other special configuration around the queue. If there are specific parameters we should check please let us know as that could be a good clue.
While we do generally use a connection pool for the connection from Java to the DB, we take a connection from the pool at start-up and use that for the queue. We are not looping or polling, we are using the Oracle Java library they provide for connecting to advanced queues and reacting to the onMessage handler as shown above.
aqapi-jakarta, decopiled (using jdgui) it and started searching for things that would make sense for intervals, time, connection. In the AQjmsUcp.class, methodgetPoolthere is an entry:int waitTime = Integer.parseInt(System.getProperty("oracle.jakarta.jms.ucp.clean.wait", "10"));and looked for other entries with "10" and voila it seems it is a behaviour of the driver itself to prevent constant pooling to DB.... in AQjmsConstants.class there is:static final String DEFAULT_UCP_CLN_WAIT = "10";which seems client wait? So I would recommend as a test running your application with-Doracle.jakarta.jms.ucp.clean.wait=1and see what happens.static final String NETWORK_TIMEOUT_FACTOR_SYS_PROP = AQjmsUtil.getSystemProperty("oracle.jakarta.jms.networkTimeOutFactor");which down in the code it is also default to 10 so if the first one doesn't work try this one... Other than that I'm out of options :D