5 Commits 26610cb6fe ... 0f7edd3a26

Autor SHA1 Mensagem Data
  Frédéric Praca 0f7edd3a26 Fix problem with unclosed stream há 10 meses atrás
  Frédéric Praca 7c52f4b69d Fix style há 10 meses atrás
  Frédéric Praca f20b1597ed Add publishing processor há 10 meses atrás
  Frédéric Praca c0df606687 Add Lombok and Spotless + upgrade to Nifi v2.0 há 10 meses atrás
  Frédéric Praca 0864352718 Add almost functional version for publishing há 10 meses atrás

+ 2 - 2
nifi-xmpp-nar/pom.xml

@@ -19,7 +19,7 @@
     <parent>
         <groupId>fr.chickenkiller.nifi</groupId>
         <artifactId>nifi-xmpp-processor</artifactId>
-        <version>1.0-SNAPSHOT</version>
+        <version>1.0</version>
     </parent>
 
     <artifactId>nifi-xmpp-nar</artifactId>
@@ -29,7 +29,7 @@
         <dependency>
             <groupId>fr.chickenkiller.nifi</groupId>
             <artifactId>nifi-xmpp-processors</artifactId>
-            <version>1.0-SNAPSHOT</version>
+            <version>1.0</version>
         </dependency>
     </dependencies>
 

+ 60 - 32
nifi-xmpp-processors/pom.xml

@@ -21,7 +21,7 @@
 	<parent>
 		<groupId>fr.chickenkiller.nifi</groupId>
 		<artifactId>nifi-xmpp-processor</artifactId>
-		<version>1.0-SNAPSHOT</version>
+		<version>1.0</version>
 	</parent>
 
 	<artifactId>nifi-xmpp-processors</artifactId>
@@ -29,38 +29,11 @@
 
 	<properties>
 		<smack.version>4.4.8</smack.version>
+		<spotless.version>2.43.0</spotless.version>
+		<lombok.version>1.18.32</lombok.version>
 	</properties>
 
 	<dependencies>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-api</artifactId>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-utils</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-mock</artifactId>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-simple</artifactId>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.junit.jupiter</groupId>
-			<artifactId>junit-jupiter-api</artifactId>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.junit.jupiter</groupId>
-			<artifactId>junit-jupiter-engine</artifactId>
-			<scope>test</scope>
-		</dependency>
 		<dependency>
 			<groupId>org.igniterealtime.smack</groupId>
 			<artifactId>smack-tcp</artifactId>
@@ -68,13 +41,68 @@
 		</dependency>
 		<dependency>
 			<groupId>org.igniterealtime.smack</groupId>
-			<artifactId>smack-tcp</artifactId>
+			<artifactId>smack-java8</artifactId>
 			<version>${smack.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>org.igniterealtime.smack</groupId>
-			<artifactId>smack-java8</artifactId>
+			<artifactId>smack-extensions</artifactId>
 			<version>${smack.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>com.diffplug.spotless</groupId>
+			<artifactId>spotless-maven-plugin</artifactId>
+			<version>${spotless.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+			<version>${lombok.version}</version>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<annotationProcessorPaths>
+						<path>
+							<groupId>org.projectlombok</groupId>
+							<artifactId>lombok</artifactId>
+							<version>${lombok.version}</version>
+						</path>
+					</annotationProcessorPaths>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>com.diffplug.spotless</groupId>
+				<artifactId>spotless-maven-plugin</artifactId>
+				<version>${spotless.version}</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<!-- define a language-specific format -->
+					<java>
+						<importOrder />
+						<removeUnusedImports />
+
+						<palantirJavaFormat>
+							<version>2.39.0</version>
+							<style>PALANTIR</style>
+							<formatJavadoc>true</formatJavadoc>
+						</palantirJavaFormat>
+
+						<formatAnnotations />
+					</java>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
 </project>

+ 108 - 0
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/AbstractXMPPProcessor.java

@@ -0,0 +1,108 @@
+package fr.chickenkiller.nifi.processors.xmpp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.jivesoftware.smack.AbstractXMPPConnection;
+import org.jivesoftware.smack.SmackException;
+import org.jivesoftware.smack.XMPPException;
+import org.jivesoftware.smack.chat2.ChatManager;
+import org.jivesoftware.smack.tcp.XMPPTCPConnection;
+import org.jivesoftware.smack.tcp.XMPPTCPConnectionConfiguration;
+import org.jxmpp.stringprep.XmppStringprepException;
+
+public abstract class AbstractXMPPProcessor extends AbstractProcessor {
+
+	public static final PropertyDescriptor SERVER = new PropertyDescriptor.Builder().name("XMPP Server")
+			.description("Server hostname or IP to connect to").required(true)
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("PORT")
+			.displayName("XMPP Server port").description("Port used by the XMPP server").defaultValue("5222")
+			.addValidator(StandardValidators.PORT_VALIDATOR).build();
+
+	public static final PropertyDescriptor JID = new PropertyDescriptor.Builder().name("JID")
+			.description("User JID used by this processor to receive/send messages").required(true)
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("JID Password")
+			.description("JID password for the user JID set").required(true)
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+
+	protected List<PropertyDescriptor> descriptors;
+
+	protected Set<Relationship> relationships;
+
+	protected AbstractXMPPConnection connection = null;
+	protected ChatManager chatMgr = null;
+
+	protected void init(final ProcessorInitializationContext context) {
+		descriptors = new ArrayList<>();
+		descriptors.add(SERVER);
+		descriptors.add(PORT);
+		descriptors.add(JID);
+		descriptors.add(PASSWORD);
+
+		relationships = new HashSet<>();
+	}
+
+	@Override
+	public Set<Relationship> getRelationships() {
+		return this.relationships;
+	}
+
+	@Override
+	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+		return descriptors;
+	}
+
+	@OnStopped
+	public void onStopped() {
+		if (connection != null) {
+			connection.disconnect();
+		}
+	}
+
+	protected void prepareCommunication(final ProcessContext context) {
+		getLogger().debug("Preapring connection for {} on server {}", context.getProperty(JID).getValue(),
+				context.getProperty(SERVER).getValue());
+		XMPPTCPConnectionConfiguration config;
+		try {
+			config = XMPPTCPConnectionConfiguration.builder()
+					.setXmppAddressAndPassword(context.getProperty(JID).getValue(),
+							context.getProperty(PASSWORD).getValue())
+					.setHost(context.getProperty(SERVER).getValue()).build();
+
+			connection = new XMPPTCPConnection(config);
+			connection.connect();
+			connection.login(); // Logs in
+		} catch (XmppStringprepException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (SmackException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (XMPPException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} // Establishes a connection to the server
+
+		chatMgr = ChatManager.getInstanceFor(connection);
+	}
+}

+ 78 - 49
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/ConsumeXMPP.java

@@ -16,49 +16,48 @@
  */
 package fr.chickenkiller.nifi.processors.xmpp;
 
-import java.util.ArrayList;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
+import org.jivesoftware.smack.chat2.Chat;
+import org.jivesoftware.smack.chat2.IncomingChatMessageListener;
+import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Message.Body;
+import org.jxmpp.jid.EntityBareJid;
 
-@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
+import fr.chickenkiller.nifi.processors.xmpp.domain.IncomingMessage;
+
+@Tags({ "xmpp", "conversation", "chat.getMessage", "social media", "team", "text", "unstructured", "read" })
 @CapabilityDescription("Consumes messages from a XMPP server")
-@SeeAlso({})
+@TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({ @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
-		@WritesAttribute(attribute = "xmpp.sending.date", description = "Sending date of the message") })
-public class ConsumeXMPP extends AbstractProcessor {
-
-	public static final PropertyDescriptor SERVER = new PropertyDescriptor.Builder().name("XMPP Server")
-			.description("jabber.org").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("PORT")
-			.displayName("XMPP Server port").description("5222").defaultValue("5222")
-			.addValidator(StandardValidators.PORT_VALIDATOR).build();
-
-	public static final PropertyDescriptor JID = new PropertyDescriptor.Builder().name("Receiving JID").required(true)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("JID Password")
-			.required(true).sensitive(true).build();
+		@WritesAttribute(attribute = "xmpp.received.date", description = "Date when message was received") })
+@SeeAlso(PublishXMPP.class)
+public class ConsumeXMPP extends AbstractXMPPProcessor {
 
 	public static final Relationship SUCCESS = new Relationship.Builder().name("success")
 			.description("Message received and successfully parsed").build();
@@ -66,46 +65,76 @@ public class ConsumeXMPP extends AbstractProcessor {
 	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
 			.description("Problem when receiving message").build();
 
-	private List<PropertyDescriptor> descriptors;
+	private Queue<IncomingMessage> messages = new ConcurrentLinkedDeque<>();
 
-	private Set<Relationship> relationships;
+	public ConsumeXMPP() {
+		super();
+	}
 
 	@Override
 	protected void init(final ProcessorInitializationContext context) {
-		descriptors = new ArrayList<>();
-		descriptors.add(SERVER);
-		descriptors.add(PORT);
-		descriptors.add(JID);
-		descriptors.add(PASSWORD);
-		descriptors = Collections.unmodifiableList(descriptors);
-
-		relationships = new HashSet<>();
+		super.init(context);
 		relationships.add(SUCCESS);
 		relationships.add(FAILURE);
 		relationships = Collections.unmodifiableSet(relationships);
 	}
 
-	@Override
-	public Set<Relationship> getRelationships() {
-		return this.relationships;
-	}
-
-	@Override
-	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-		return descriptors;
-	}
-
 	@OnScheduled
 	public void onScheduled(final ProcessContext context) {
-
+		prepareCommunication(context);
+		chatMgr.addIncomingListener(new IncomingChatMessageListener() {
+			@Override
+			public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
+				messages.add(new IncomingMessage(from, message));
+			}
+		});
 	}
 
 	@Override
 	public void onTrigger(final ProcessContext context, final ProcessSession session) {
-		FlowFile flowFile = session.get();
-		if (flowFile == null) {
-			return;
+
+		final long start = System.nanoTime();
+
+		if (messages.isEmpty()) {
+			context.yield();
+		} else {
+			while (!messages.isEmpty()) {
+				IncomingMessage m = messages.poll();
+
+				getLogger().info("Received {} messages from {}",
+						new Object[] { m.getMessage().getBodies().size(), m.getFrom().asEntityBareJidString() });
+
+				for (Body body : m.getMessage().getBodies()) {
+					FlowFile flowFile = session.create();
+
+					try (final OutputStream out = session.write(flowFile)) {
+
+						out.write(body.getMessage().getBytes());
+						out.close();
+						
+						final Map<String, String> attributes = new HashMap<>();
+						attributes.put("xmpp.sender.jid", m.getFrom().asEntityBareJidString());
+						attributes.put("xmpp.sending.date",
+								DateTimeFormatter.ISO_DATE_TIME.format(ZonedDateTime.now()));
+						attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+						
+						flowFile = session.putAllAttributes(flowFile, attributes);
+
+						final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+						session.getProvenanceReporter().receive(flowFile, "xmpp://" + context.getProperty(SERVER),
+								"Received XMPP message from " + m.getFrom().asEntityBareJidString(), millis);
+						getLogger().info("Successfully received {} from {}",
+								new Object[] { flowFile, m.getFrom().asEntityBareJidString() });
+						session.transfer(flowFile, SUCCESS);
+
+						session.commit();
+					} catch (final IOException e) {
+						getLogger().error("Failed to write out the message received", e);
+						session.remove(flowFile);
+						session.rollback();
+					}
+				}
+			}
 		}
-		session.transfer(flowFile, SUCCESS);
 	}
 }

+ 181 - 0
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/PublishXMPP.java

@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package fr.chickenkiller.nifi.processors.xmpp;
+
+import java.io.InputStream;
+import java.util.Collections;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.jivesoftware.smack.SmackException.NotConnectedException;
+import org.jivesoftware.smack.chat2.Chat;
+import org.jivesoftware.smack.packet.MessageBuilder;
+import org.jivesoftware.smack.packet.StanzaBuilder;
+import org.jxmpp.jid.EntityBareJid;
+import org.jxmpp.jid.impl.JidCreate;
+import org.jxmpp.stringprep.XmppStringprepException;
+
+@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
+@CapabilityDescription("Send messages to a XMPP server")
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@ReadsAttributes({ @ReadsAttribute(attribute = "xmpp.recipient.jid", description = "Message recipient JID"), })
+public class PublishXMPP extends AbstractXMPPProcessor {
+
+	/** Default subject value */
+	public static String DEFAULT_SUBJECT = "Nifi message";
+
+	/** Default language */
+	public static String DEFAULT_LANGUAGE = "en";
+
+	public static final PropertyDescriptor RECIPIENT = new PropertyDescriptor.Builder().name("Recipient JID")
+			.description("User JID used by this processor to send messages to").required(true)
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder().name("Message subject")
+			.description("Message subject that will be shown").required(false).defaultValue(PublishXMPP.DEFAULT_SUBJECT)
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	public static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder().name("Language used for message")
+			.description("Language used in the message").required(true).defaultValue(PublishXMPP.DEFAULT_LANGUAGE)
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Message sent")
+			.build();
+
+	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
+			.description("Problem when sending message").build();
+
+	/** Recipient JID */
+	private EntityBareJid recipientJid;
+
+	public PublishXMPP() {
+		super();
+	}
+
+	@Override
+	protected void init(final ProcessorInitializationContext context) {
+		super.init(context);
+		descriptors.add(RECIPIENT);
+		descriptors = Collections.unmodifiableList(descriptors);
+
+		relationships.add(SUCCESS);
+		relationships.add(FAILURE);
+		relationships = Collections.unmodifiableSet(relationships);
+	}
+
+	@OnScheduled
+	public void onScheduled(final ProcessContext context) {
+		prepareCommunication(context);
+		// Build the recipient ID
+		try {
+			recipientJid = JidCreate.entityBareFrom(context.getProperty(RECIPIENT).toString());
+		} catch (XmppStringprepException e) {
+			getLogger().error("Impossible to build recepient Jid from {} property with value {}",
+					new Object[] { RECIPIENT.getName(), context.getProperty(RECIPIENT).toString() });
+		}
+	}
+
+	@Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) {
+
+		FlowFile flowFile = session.get();
+
+		if (flowFile == null) {
+			context.yield();
+			return;
+		}
+
+		Chat chat = chatMgr.chatWith(recipientJid);
+		getLogger().info("Ready to send to " + recipientJid.asEntityBareJidString());
+
+		MessageBuilder builder = StanzaBuilder.buildMessage();
+		getLogger().debug("MessageBuilder created");
+
+		getLogger().debug("Adding Subject");
+		String subject = context.getProperty(SUBJECT).getValue() != null ? context.getProperty(SUBJECT).getValue()
+				: DEFAULT_SUBJECT;
+		String language = context.getProperty(LANGUAGE).getValue() != null ? context.getProperty(LANGUAGE).getValue()
+				: DEFAULT_LANGUAGE;
+		builder.addSubject(language, subject);
+		builder.setSubject(subject);
+
+		// session.read(flowfile, new InputStreamCallback() {
+		//
+		// @Override
+		// public void process(InputStream in) throws IOException {
+		// ByteBuffer byteBufer = ByteBuffer.wrap(in.readAllBytes());
+		// CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBufer);
+		// builder.addBody("", charBuffer.toString());
+		// }
+		// });
+
+		getLogger().debug("Setting language");
+		builder.setLanguage(language);
+
+		getLogger().debug("Reading flowfile");
+
+		final String messageText;
+		final byte[] buffer = new byte[(int) flowFile.getSize()];
+		try (final InputStream in = session.read(flowFile)) {
+			// Reading the content
+			in.read(buffer);
+			in.close();
+
+			messageText = new String(buffer);
+
+			builder.addBody(language, messageText);
+
+			getLogger().debug("Subject set to \"{}\" with language {}, ready to send", builder.getSubject(),
+					builder.getLanguage());
+			chat.send(builder.build());
+			final long sent = System.nanoTime();
+			session.getProvenanceReporter().send(flowFile, "xmpp://" + context.getProperty(SERVER),
+					"Sent XMPP message to " + recipientJid.asEntityBareJidString(), sent);
+			getLogger().info("Successfully send {} to {}",
+					new Object[] { flowFile, recipientJid.asEntityBareJidString() });
+			session.transfer(flowFile, SUCCESS);
+			getLogger().debug("Flowfile {} sent to success", flowFile);
+		} catch (NotConnectedException e) {
+			getLogger().error("Message to {} impossible to send, not connected: {}",
+					recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
+			session.penalize(flowFile);
+			session.transfer(flowFile, FAILURE);
+			session.rollback();
+		} catch (InterruptedException e) {
+			getLogger().error("Message to {} impossible to send, interrupted: {}", recipientJid.asEntityBareJidString(),
+					e.getLocalizedMessage());
+			session.penalize(flowFile);
+			session.transfer(flowFile, FAILURE);
+			session.rollback();
+		} catch (Exception e) {
+			getLogger().error("Got an exception", e);
+			session.rollback();
+		}
+	}
+}

+ 29 - 0
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/domain/IncomingMessage.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package fr.chickenkiller.nifi.processors.xmpp.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.jivesoftware.smack.packet.Message;
+import org.jxmpp.jid.EntityBareJid;
+
+@Data
+@AllArgsConstructor
+public class IncomingMessage {
+    private EntityBareJid from;
+    private Message message;
+}

+ 2 - 1
nifi-xmpp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor

@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-fr.chickenkiller.nifi.processors.xmpp.ConsumeXMPP
+fr.chickenkiller.nifi.processors.xmpp.ConsumeXMPP
+fr.chickenkiller.nifi.processors.xmpp.PublishXMPP

+ 1 - 4
nifi-xmpp-processors/src/test/java/fr/chickenkiller/nifi/processors/xmpp/MyProcessorTest.java

@@ -31,8 +31,5 @@ public class MyProcessorTest {
     }
 
     @Test
-    public void testProcessor() {
-
-    }
-
+    public void testProcessor() {}
 }

+ 3 - 3
pom.xml

@@ -18,13 +18,13 @@
 
     <parent>
         <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-bundles</artifactId>
-        <version>1.25.0</version>
+	<artifactId>nifi-standard-shared-bom</artifactId>
+	<version>2.0.0</version>
     </parent>
 
     <groupId>fr.chickenkiller.nifi</groupId>
     <artifactId>nifi-xmpp-processor</artifactId>
-    <version>1.0-SNAPSHOT</version>
+    <version>1.0</version>
     <packaging>pom</packaging>
 
     <modules>