0

My Apache Beam application sends messages to a Solace queue. To send the message, I use the Apache Qpid JMS library and Java Messaging Service (JMS). However, when I launch my application, more than 2000 connections are created for two nodes. I have more than 2000 logs messages like the following when I start my Beam application:

Connection ID:0030887-f9c8-429f-b5d1-eda15330e262:1 connected to server: amqps://my.queue.address:54321

It bothers me as my number of connection to Solace is limited (400 connections)

Is there a way to reduce the number of JMS connections created in Apache Beam applications?

My code

I call the the writer in the following method:

@NotNull
@Override
public POutput expand(@NotNull PCollection<String> input) {
  input.apply(
      "WriteMessageToMyTopic",
      JmsIOWriter.writeMessage());

  return PDone.in(input.getPipeline());
}

JMS Writer

Here is the code of the writer

package com.example;

import com.example.StaticConnectionFactory;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.RetryConfiguration;

final class JmsIOWriter implements Serializable {

  public static JmsIO.Write<String> writeMessage() {
    var connectionFactory = StaticConnectionFactory.getFactory();

    return JmsIO.<String>write()
        .withRetryConfiguration(RetryConfiguration.create(5))
        .withConnectionFactory(connectionFactory)
        .withTopicNameMapper(m -> topicMapper(m, "my/topic/"))
        .withValueMapper(JmsIOWriter::valueMapper);
  }

  private static Message valueMapper(String element, Session session) {
    try {
      return session.createTextMessage(element);
    } catch (JMSException ex) {
      throw new IllegalStateException(
          String.format("Unable to send message in queue %s", element), ex);
    }
  }

  private static String topicMapper(String message) {
    return "my/topic/" + message.length;
  }
}

Connection Factory

I use a static connection factory that I inject in the writer

package com.example;

import org.apache.qpid.jms.JmsConnectionFactory;

public final class StaticConnectionFactory {

  private static JmsConnectionFactory factory;

  public static synchronized JmsConnectionFactory getFactory() {
    if (factory == null) {
      factory = new SslJmsConnectionFactory();
      factory.setUsername("myUsername");
      factory.setPassword("myPassword");
      factory.setRemoteURI("amqps://my.queue.address:54321");
      factory.setForceAsyncAcks(true);
      factory.setReceiveLocalOnly(true);
      factory.setReceiveNoWaitLocalOnly(true);
    }
    return factory;
  }
}

SSL connection factory

This static connection factory returns an SSL connection factory

package com.example;

import static com.example.SslContextFactory.createSslContext;

import java.util.Base64;
import javax.jms.Connection;
import javax.jms.JMSException;

public class SslJmsConnectionFactory extends org.apache.qpid.jms.JmsConnectionFactory {

  @Override
  public Connection createConnection(String username, String password) throws JMSException {
    initSslContext();
    return super.createConnection(username, password);
  }

  private void initSslContext() {
    setSslContext(
        createSslContext(
            Base64.getDecoder().decode(this.getUsername()), this.getPassword().toCharArray()));
  }
}

SSL context factory

This SSL connection factory use SSL context factory to initialize SSL context

package com.example;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;

public final class SslContextFactory {

  public static SSLContext createSslContext(byte[] credentials, char[] password) {

    try (var inputStream = new ByteArrayInputStream(credentials)) {
      var keyStore = KeyStore.getInstance("PKCS12");
      keyStore.load(inputStream, password);
      return initSslContext(keyStore, password);
    } catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
      throw new IllegalStateException("Cannot create SSL context", e);
    }
  }

  private static SSLContext initSslContext(KeyStore keyStore, char[] password) {
    try {
      var sslContext = SSLContext.getInstance("TLSv1.2");
      var keyManagerFactory = createKeyManagerFactory(keyStore, password);
      sslContext.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());
      return sslContext;
    } catch (NoSuchAlgorithmException | KeyManagementException e) {
      throw new IllegalStateException("Unable to initialize SSL context", e);
    }
  }

  private static KeyManagerFactory createKeyManagerFactory(KeyStore keyStore, char[] password) {
    try {
      var keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
      keyManagerFactory.init(keyStore, password);
      return keyManagerFactory;
    } catch (NoSuchAlgorithmException | KeyStoreException | UnrecoverableKeyException e) {
      throw new IllegalStateException("Unable to initialize a key manager factory", e);
    }
  }
}

What I tried so far

  • Use static JMS connection factory to avoid recreating connections, no effect
  • Setup SSL context when initializing JMS connection factory instead of initializing it when I create a connection. I was unable to connect to the queue due to org.apache.qpid.jms.provider.exceptions.ProviderConnectionSecuritySaslException: Client failed to authenticate using SASL: PLAIN
  • I currently cannot use new Solace Writer available since Apache Beam 2.61.0 as it is labeled as experimental
4
  • 2
    Have you considered using a connection pool? Commented Dec 9, 2024 at 0:41
  • it certainly looks as if every time you try to JmsIOWriter.writeMessage() it recreates a new connection. You don't seem to be using Spring but I'll link this since it seems related. Commented Dec 9, 2024 at 2:23
  • 1
    qpid.apache.org/releases/qpid-jms-amqp-0-x-6.4.0/… This might be useful Commented Dec 9, 2024 at 6:55
  • That qpid link is from the old 0.9.x client that is EOL. You would want to use the pooled-jms library that Justin linked to Commented May 22 at 14:08

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.