Browse Source

Fix problem with unclosed stream

Frédéric Praca 10 months ago
parent
commit
0f7edd3a26

+ 38 - 39
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/PublishXMPP.java

@@ -16,7 +16,6 @@
  */
  */
 package fr.chickenkiller.nifi.processors.xmpp;
 package fr.chickenkiller.nifi.processors.xmpp;
 
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.util.Collections;
 import java.util.Collections;
 
 
@@ -33,7 +32,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.jivesoftware.smack.SmackException.NotConnectedException;
 import org.jivesoftware.smack.SmackException.NotConnectedException;
 import org.jivesoftware.smack.chat2.Chat;
 import org.jivesoftware.smack.chat2.Chat;
@@ -141,42 +139,43 @@ public class PublishXMPP extends AbstractXMPPProcessor {
 		builder.setLanguage(language);
 		builder.setLanguage(language);
 
 
 		getLogger().debug("Reading flowfile");
 		getLogger().debug("Reading flowfile");
-		session.read(flowFile, new InputStreamCallback() {
-
-			@Override
-			public void process(InputStream in) throws IOException {
-				try {
-					builder.addBody(language, new String(in.readAllBytes()));
-					getLogger().debug("Subject set to \"{}\" with language {}, ready to send", builder.getSubject(),
-							builder.getLanguage());
-					chat.send(builder.build());
-					final long sent = System.nanoTime();
-					session.getProvenanceReporter().send(flowFile, "xmpp://" + context.getProperty(SERVER),
-							"Sent XMPP message to " + recipientJid.asEntityBareJidString(), sent);
-					getLogger().info("Successfully send {} to {}",
-							new Object[] { flowFile, recipientJid.asEntityBareJidString() });
-					session.transfer(flowFile, SUCCESS);
-					getLogger().debug("Flowfile {} sent to success", flowFile);
-				} catch (NotConnectedException e) {
-					getLogger().error("Message to {} impossible to send, not connected: {}",
-							recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
-					session.penalize(flowFile);
-					session.transfer(flowFile, FAILURE);
-					session.rollback();
-				} catch (InterruptedException e) {
-					getLogger().error("Message to {} impossible to send, interrupted: {}",
-							recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
-					session.penalize(flowFile);
-					session.transfer(flowFile, FAILURE);
-					session.rollback();
-				} catch (Exception e) {
-					getLogger().error("Got an exception", e);
-					session.rollback();
-				} finally {
-					getLogger().debug("Closing the input stream");
-					in.close();
-				}
-			}
-		});
+
+		final String messageText;
+		final byte[] buffer = new byte[(int) flowFile.getSize()];
+		try (final InputStream in = session.read(flowFile)) {
+			// Reading the content
+			in.read(buffer);
+			in.close();
+
+			messageText = new String(buffer);
+
+			builder.addBody(language, messageText);
+
+			getLogger().debug("Subject set to \"{}\" with language {}, ready to send", builder.getSubject(),
+					builder.getLanguage());
+			chat.send(builder.build());
+			final long sent = System.nanoTime();
+			session.getProvenanceReporter().send(flowFile, "xmpp://" + context.getProperty(SERVER),
+					"Sent XMPP message to " + recipientJid.asEntityBareJidString(), sent);
+			getLogger().info("Successfully send {} to {}",
+					new Object[] { flowFile, recipientJid.asEntityBareJidString() });
+			session.transfer(flowFile, SUCCESS);
+			getLogger().debug("Flowfile {} sent to success", flowFile);
+		} catch (NotConnectedException e) {
+			getLogger().error("Message to {} impossible to send, not connected: {}",
+					recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
+			session.penalize(flowFile);
+			session.transfer(flowFile, FAILURE);
+			session.rollback();
+		} catch (InterruptedException e) {
+			getLogger().error("Message to {} impossible to send, interrupted: {}", recipientJid.asEntityBareJidString(),
+					e.getLocalizedMessage());
+			session.penalize(flowFile);
+			session.transfer(flowFile, FAILURE);
+			session.rollback();
+		} catch (Exception e) {
+			getLogger().error("Got an exception", e);
+			session.rollback();
+		}
 	}
 	}
 }
 }