I have a producer
consumer
setup using a Wildfly
AS for JMS
, the producer uses a newFixedThreadPool(126)
every 1 minute each thread is pulling data down from a REST
service and is pushing it onto a HornetQ
on the Wildfly
AS.
Then on the consumer side I have a Consumer class that consumes the messges in the HornetQ
and a simple Parser
class, for DB insertion, I instantiate an object of type Parser
in my onMessage()
and then pass the message to it, the message is in JSON
and my parser class loops through it getting the values and inserting them into my DB.
Consumer:
public void Consume(Consumer asyncReceiver) throws Throwable {
try {
/** Get the initial context */
final Properties props = new Properties();
/** If debugging in IDE the properties are acceded this way */
if(debug){
InputStream f = getClass().getClassLoader().getResourceAsStream("consumer.properties");
props.load(f);
}
/** If running the .jar artifact the properties are acceded this way*/
else{
File jarPath = new File(getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
String propertiesPath = jarPath.getParentFile().getAbsolutePath();
props.load(new FileInputStream(propertiesPath + File.separator + "consumer.properties"));
}
/** These few lines should be removed and setup in the properties file*/
props.put(Context.INITIAL_CONTEXT_FACTORY, props.getProperty("INITIAL_CONTEXT_FACTORY"));
props.put(Context.PROVIDER_URL, props.getProperty("PROVIDER_URL"));
props.put(Context.SECURITY_PRINCIPAL, props.getProperty("DEFAULT_USERNAME"));
props.put(Context.SECURITY_CREDENTIALS, props.getProperty("DEFAULT_PASSWORD"));
context = new InitialContext(props);
/** Lookup the queue object */
Queue queue = (Queue) context.lookup(props.getProperty("DEFAULT_DESTINATION"));
/** Lookup the queue connection factory */
ConnectionFactory connFactory = (ConnectionFactory) context.lookup(props.getProperty("DEFAULT_CONNECTION_FACTORY"));
/** Create a queue connection */
connection = connFactory.createConnection(props.getProperty("DEFAULT_USERNAME"), props.getProperty("DEFAULT_PASSWORD"));
/** Create a queue session */
queueSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** Create a queue consumer */
msgConsumer = queueSession.createConsumer(queue);
/** Set an asynchronous message listener */
msgConsumer.setMessageListener(asyncReceiver);
/** Set an asynchronous exception listener on the connection */
connection.setExceptionListener(asyncReceiver);
/** Start connection */
connection.start();
/** Wait for messages */
System.out.println("waiting for messages");
for (int i = 0; i < 47483647; i++) {
Thread.sleep(1000);
System.out.print(".");
}
System.out.println();
} catch (Exception e) {
log.severe(e.getMessage());
throw e;
}finally {
if (context != null) {
context.close();
}
if (queueSession != null)
{ queueSession.close();
}
if(msgConsumer != null){
msgConsumer.close();
}
if (connection != null) {
connection.close();
}
}
}
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
Parser parser = new Parser();
parser.parseApplication(msg.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
Parser:
public void parseApplication(String NRData) throws Exception {
DBConnection db = DBConnection.createApplication();
db.getConnection();
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
JsonNode rootNode = mapper.readTree(NRData);
Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String, JsonNode> field = fieldsIterator.next();
String envName = field.getKey();
JsonNode appValue = field.getValue();
JSONArray jsonArray = new JSONArray(appValue.toString());
String appName = jsonArray.getString(0);
String appID = jsonArray.getString(1);
JSONObject json = jsonArray.getJSONObject(2);
JSONObject metricsData = json.getJSONObject("metric_data");
JSONArray metrics = metricsData.getJSONArray("metrics");
JSONObject array1 = metrics.getJSONObject(0);
JSONArray timeslices = array1.getJSONArray("timeslices");
for (int i = 0; i < timeslices.length(); i++) {
JSONObject array2 = timeslices.getJSONObject(i);
JSONObject values = array2.getJSONObject("values");
// Instant from = array2.getString("from");
Instant from = TimestampUtils.parseTimestamp(array2.get("from").toString(), null);
Instant to = TimestampUtils.parseTimestamp(array2.get("to").toString(), null);
Iterator<String> nameItr = values.keys();
while (nameItr.hasNext()) {
String name = nameItr.next();
System.out.println(
"\nEnv name: " + envName +
"\nApp name: " + appName +
"\nApp ID: " + appID +
"\nRequest per minute: " + values.getDouble(name) +
"\nFrom: " + from + " To: " + to);
ThroughputEntry TP = new ThroughputEntry();
TP.setThroughput(values.getDouble(name));
TP.setEnvironment(envName);
TP.setName(appName);
TP.setRetrieved(from);
TP.setPeriodEnd(to);
db.addHistory(TP);
}
}
}
}
DBconnection :
public class DBConnection {
private final String table;
/**
* Set the table name for applications
*/
public static DBConnection createApplication() {
return new DBConnection("APPLICATIONDATA");
}
public DBConnection(String table) {
this.table = String.format("NRDATA.%s", table);
}
public Connection getConnection() throws IllegalAccessException,
InstantiationException, ClassNotFoundException, SQLException {
try {
Class.forName("COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
System.out.println("Connecting to database...");
Connection connection = DriverManager.getConnection(DB2url, user, password);
System.out.println("From DAO, connection obtained ");
return connection;
}
public boolean addHistory(ThroughputEntry entry) throws Exception {
try (Connection connection = getConnection()) {
Statement statement = connection.createStatement();
return 0 < statement
.executeUpdate(String
.format("INSERT INTO " + table
+ "(ID, RETRIEVED, PERIOD_END, ENVIRONMENT, APPNAME, THROUGHPUT)"
+ "VALUES ('%s', '%s', '%s', '%s', '%s', %s)",
entry.getUUID(), entry.getRetrieved(),
entry.getPeriodEnd(), entry.getEnvironment(),
entry.getName(), entry.getThroughput()));
}
}
}
So my problem is that I'm getting a memory leak on the Wildfly
AS, and I think the problem could be with my consumer.
So a few questions:
should I be buffering the messages received in my onMessage()
method on the consumer before inserting them into the DB?
if I'm getting too many messages could this be causing the leak? does the consumer send any sort og Ack
to the Wildfly
AS?
I have the consumer running indefinitely with a loop, maybe this is wrong maybe it should sleep, or wait.
I've been 2 days trying to solve this now, any help would be really appreciated.
You should close anything needs to be closed. I'm looking at the first few lines and I'm already seeing two streams that are not being closed.
Review your code, anything that implements AutoCloseable
needs to be properly closed. Use a try-with-resources
to do that.
IDE's can already give you pointers to possible resource leaks, eg in Eclipse tweak warnings in Java Compiler>Errors/Warnings
.
Edit: you edited your question and now a clear leak is visible. In the parseApplication
method you have the statement db.getConnection()
. The method creates a connection that you never use and you never close it..