My Apache Beam application sends messages to a Solace queue. To send the message, I use the Apache Qpid JMS library and Java Messaging Service (JMS). However, when I launch my application, more than 2000 connections are created for two nodes. I have more than 2000 logs messages like the following when I start my Beam application:
Connection ID:0030887-f9c8-429f-b5d1-eda15330e262:1 connected to server: amqps://my.queue.address:54321
It bothers me as my number of connection to Solace is limited (400 connections)
Is there a way to reduce the number of JMS connections created in Apache Beam applications?
My code
I call the the writer in the following method:
@NotNull
@Override
public POutput expand(@NotNull PCollection<String> input) {
input.apply(
"WriteMessageToMyTopic",
JmsIOWriter.writeMessage());
return PDone.in(input.getPipeline());
}
JMS Writer
Here is the code of the writer
package com.example;
import com.example.StaticConnectionFactory;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.RetryConfiguration;
final class JmsIOWriter implements Serializable {
public static JmsIO.Write<String> writeMessage() {
var connectionFactory = StaticConnectionFactory.getFactory();
return JmsIO.<String>write()
.withRetryConfiguration(RetryConfiguration.create(5))
.withConnectionFactory(connectionFactory)
.withTopicNameMapper(m -> topicMapper(m, "my/topic/"))
.withValueMapper(JmsIOWriter::valueMapper);
}
private static Message valueMapper(String element, Session session) {
try {
return session.createTextMessage(element);
} catch (JMSException ex) {
throw new IllegalStateException(
String.format("Unable to send message in queue %s", element), ex);
}
}
private static String topicMapper(String message) {
return "my/topic/" + message.length;
}
}
Connection Factory
I use a static connection factory that I inject in the writer
package com.example;
import org.apache.qpid.jms.JmsConnectionFactory;
public final class StaticConnectionFactory {
private static JmsConnectionFactory factory;
public static synchronized JmsConnectionFactory getFactory() {
if (factory == null) {
factory = new SslJmsConnectionFactory();
factory.setUsername("myUsername");
factory.setPassword("myPassword");
factory.setRemoteURI("amqps://my.queue.address:54321");
factory.setForceAsyncAcks(true);
factory.setReceiveLocalOnly(true);
factory.setReceiveNoWaitLocalOnly(true);
}
return factory;
}
}
SSL connection factory
This static connection factory returns an SSL connection factory
package com.example;
import static com.example.SslContextFactory.createSslContext;
import java.util.Base64;
import javax.jms.Connection;
import javax.jms.JMSException;
public class SslJmsConnectionFactory extends org.apache.qpid.jms.JmsConnectionFactory {
@Override
public Connection createConnection(String username, String password) throws JMSException {
initSslContext();
return super.createConnection(username, password);
}
private void initSslContext() {
setSslContext(
createSslContext(
Base64.getDecoder().decode(this.getUsername()), this.getPassword().toCharArray()));
}
}
SSL context factory
This SSL connection factory use SSL context factory to initialize SSL context
package com.example;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
public final class SslContextFactory {
public static SSLContext createSslContext(byte[] credentials, char[] password) {
try (var inputStream = new ByteArrayInputStream(credentials)) {
var keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(inputStream, password);
return initSslContext(keyStore, password);
} catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
throw new IllegalStateException("Cannot create SSL context", e);
}
}
private static SSLContext initSslContext(KeyStore keyStore, char[] password) {
try {
var sslContext = SSLContext.getInstance("TLSv1.2");
var keyManagerFactory = createKeyManagerFactory(keyStore, password);
sslContext.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());
return sslContext;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IllegalStateException("Unable to initialize SSL context", e);
}
}
private static KeyManagerFactory createKeyManagerFactory(KeyStore keyStore, char[] password) {
try {
var keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(keyStore, password);
return keyManagerFactory;
} catch (NoSuchAlgorithmException | KeyStoreException | UnrecoverableKeyException e) {
throw new IllegalStateException("Unable to initialize a key manager factory", e);
}
}
}
What I tried so far
- Use static JMS connection factory to avoid recreating connections, no effect
- Setup SSL context when initializing JMS connection factory instead of initializing it when I create a connection. I was unable to connect to the queue due to
org.apache.qpid.jms.provider.exceptions.ProviderConnectionSecuritySaslException: Client failed to authenticate using SASL: PLAIN - I currently cannot use new Solace Writer available since Apache Beam 2.61.0 as it is labeled as experimental
JmsIOWriter.writeMessage()it recreates a new connection. You don't seem to be using Spring but I'll link this since it seems related.