3 Revize f748b67613 ... a21538dfed

Autor SHA1 Zpráva Datum
  Frédéric Praca a21538dfed feat(style): apply Spotless před 1 měsícem
  Frédéric Praca 927e850536 feat(style): add Spotless for pom and apply před 1 měsícem
  Frédéric Praca 55e9c249a3 feat(deps): upgrade dependencies před 1 měsícem

+ 91 - 86
nifi-xmpp-processors/pom.xml

@@ -13,96 +13,101 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
 
-	<parent>
-		<groupId>fr.chickenkiller.nifi</groupId>
-		<artifactId>nifi-xmpp-processor</artifactId>
-		<version>1.0</version>
-	</parent>
+  <parent>
+    <groupId>fr.chickenkiller.nifi</groupId>
+    <artifactId>nifi-xmpp-processor</artifactId>
+    <version>1.0</version>
+  </parent>
 
-	<artifactId>nifi-xmpp-processors</artifactId>
-	<packaging>jar</packaging>
+  <artifactId>nifi-xmpp-processors</artifactId>
+  <packaging>jar</packaging>
 
-	<properties>
-		<smack.version>4.4.8</smack.version>
-		<spotless.version>2.43.0</spotless.version>
-		<lombok.version>1.18.32</lombok.version>
-	</properties>
+  <properties>
+    <smack.version>4.4.8</smack.version>
+    <spotless.version>2.45.0</spotless.version>
+    <lombok.version>1.18.38</lombok.version>
+  </properties>
 
-	<dependencies>
-		<dependency>
-			<groupId>org.igniterealtime.smack</groupId>
-			<artifactId>smack-tcp</artifactId>
-			<version>${smack.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.igniterealtime.smack</groupId>
-			<artifactId>smack-java8</artifactId>
-			<version>${smack.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.igniterealtime.smack</groupId>
-			<artifactId>smack-extensions</artifactId>
-			<version>${smack.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>com.diffplug.spotless</groupId>
-			<artifactId>spotless-maven-plugin</artifactId>
-			<version>${spotless.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.projectlombok</groupId>
-			<artifactId>lombok</artifactId>
-			<version>${lombok.version}</version>
-			<scope>provided</scope>
-		</dependency>
-	</dependencies>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<annotationProcessorPaths>
-						<path>
-							<groupId>org.projectlombok</groupId>
-							<artifactId>lombok</artifactId>
-							<version>${lombok.version}</version>
-						</path>
-					</annotationProcessorPaths>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>com.diffplug.spotless</groupId>
-				<artifactId>spotless-maven-plugin</artifactId>
-				<version>${spotless.version}</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<!-- define a language-specific format -->
-					<java>
-						<importOrder />
-						<removeUnusedImports />
+  <dependencies>
+    <dependency>
+      <groupId>org.igniterealtime.smack</groupId>
+      <artifactId>smack-tcp</artifactId>
+      <version>${smack.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.igniterealtime.smack</groupId>
+      <artifactId>smack-java8</artifactId>
+      <version>${smack.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.igniterealtime.smack</groupId>
+      <artifactId>smack-extensions</artifactId>
+      <version>${smack.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.diffplug.spotless</groupId>
+      <artifactId>spotless-maven-plugin</artifactId>
+      <version>${spotless.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <annotationProcessorPaths>
+            <path>
+              <groupId>org.projectlombok</groupId>
+              <artifactId>lombok</artifactId>
+              <version>${lombok.version}</version>
+            </path>
+          </annotationProcessorPaths>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.diffplug.spotless</groupId>
+        <artifactId>spotless-maven-plugin</artifactId>
+        <version>${spotless.version}</version>
+        <configuration>
+          <!-- define a language-specific format -->
+          <java>
+            <importOrder/>
+            <removeUnusedImports/>
 
-						<palantirJavaFormat>
-							<version>2.39.0</version>
-							<style>PALANTIR</style>
-							<formatJavadoc>true</formatJavadoc>
-						</palantirJavaFormat>
+            <palantirJavaFormat>
+              <version>2.39.0</version>
+              <style>PALANTIR</style>
+              <formatJavadoc>true</formatJavadoc>
+            </palantirJavaFormat>
 
-						<formatAnnotations />
-					</java>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+            <formatAnnotations/>
+          </java>
+          <pom>
+            <!-- These are the defaults, you can override if you want -->
+            <includes>
+              <include>pom.xml</include>
+            </includes>
+            <sortPom/>
+          </pom>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

+ 117 - 83
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/AbstractXMPPProcessor.java

@@ -5,7 +5,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -19,90 +18,125 @@ import org.jivesoftware.smack.XMPPException;
 import org.jivesoftware.smack.chat2.ChatManager;
 import org.jivesoftware.smack.tcp.XMPPTCPConnection;
 import org.jivesoftware.smack.tcp.XMPPTCPConnectionConfiguration;
+import org.jivesoftware.smackx.muc.AutoJoinSuccessCallback;
+import org.jivesoftware.smackx.muc.MultiUserChat;
+import org.jivesoftware.smackx.muc.MultiUserChatManager;
+import org.jxmpp.jid.parts.Resourcepart;
 import org.jxmpp.stringprep.XmppStringprepException;
 
 public abstract class AbstractXMPPProcessor extends AbstractProcessor {
 
-	public static final PropertyDescriptor SERVER = new PropertyDescriptor.Builder().name("XMPP Server")
-			.description("Server hostname or IP to connect to").required(true)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("PORT")
-			.displayName("XMPP Server port").description("Port used by the XMPP server").defaultValue("5222")
-			.addValidator(StandardValidators.PORT_VALIDATOR).build();
-
-	public static final PropertyDescriptor JID = new PropertyDescriptor.Builder().name("JID")
-			.description("User JID used by this processor to receive/send messages").required(true)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("JID Password")
-			.description("JID password for the user JID set").required(true)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
-
-	protected List<PropertyDescriptor> descriptors;
-
-	protected Set<Relationship> relationships;
-
-	protected AbstractXMPPConnection connection = null;
-	protected ChatManager chatMgr = null;
-
-	protected void init(final ProcessorInitializationContext context) {
-		descriptors = new ArrayList<>();
-		descriptors.add(SERVER);
-		descriptors.add(PORT);
-		descriptors.add(JID);
-		descriptors.add(PASSWORD);
-
-		relationships = new HashSet<>();
-	}
-
-	@Override
-	public Set<Relationship> getRelationships() {
-		return this.relationships;
-	}
-
-	@Override
-	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-		return descriptors;
-	}
-
-	@OnStopped
-	public void onStopped() {
-		if (connection != null) {
-			connection.disconnect();
-		}
-	}
-
-	protected void prepareCommunication(final ProcessContext context) {
-		getLogger().debug("Preapring connection for {} on server {}", context.getProperty(JID).getValue(),
-				context.getProperty(SERVER).getValue());
-		XMPPTCPConnectionConfiguration config;
-		try {
-			config = XMPPTCPConnectionConfiguration.builder()
-					.setXmppAddressAndPassword(context.getProperty(JID).getValue(),
-							context.getProperty(PASSWORD).getValue())
-					.setHost(context.getProperty(SERVER).getValue()).build();
-
-			connection = new XMPPTCPConnection(config);
-			connection.connect();
-			connection.login(); // Logs in
-		} catch (XmppStringprepException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} catch (SmackException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} catch (XMPPException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} // Establishes a connection to the server
-
-		chatMgr = ChatManager.getInstanceFor(connection);
-	}
+    public static final PropertyDescriptor SERVER = new PropertyDescriptor.Builder()
+            .name("XMPP Server")
+            .description("Server hostname or IP to connect to")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("PORT")
+            .displayName("XMPP Server port")
+            .description("Port used by the XMPP server")
+            .defaultValue("5222")
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor JID = new PropertyDescriptor.Builder()
+            .name("JID")
+            .description("User JID used by this processor to receive/send messages")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("JID Password")
+            .description("JID password for the user JID set")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    protected List<PropertyDescriptor> descriptors;
+
+    protected Set<Relationship> relationships;
+
+    protected AbstractXMPPConnection connection = null;
+    protected ChatManager chatMgr = null;
+
+    protected MultiUserChatManager mucManager = null;
+
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(SERVER);
+        descriptors.add(PORT);
+        descriptors.add(JID);
+        descriptors.add(PASSWORD);
+
+        relationships = new HashSet<>();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnStopped
+    public void onStopped() {
+        if (connection != null) {
+            connection.disconnect();
+        }
+    }
+
+    protected void prepareCommunication(final ProcessContext context) {
+        getLogger()
+                .debug(
+                        "Preapring connection for {} on server {}",
+                        context.getProperty(JID).getValue(),
+                        context.getProperty(SERVER).getValue());
+        XMPPTCPConnectionConfiguration config;
+        try {
+            config = XMPPTCPConnectionConfiguration.builder()
+                    .setXmppAddressAndPassword(
+                            context.getProperty(JID).getValue(),
+                            context.getProperty(PASSWORD).getValue())
+                    .setHost(context.getProperty(SERVER).getValue())
+                    .build();
+
+            connection = new XMPPTCPConnection(config);
+            connection.connect();
+            connection.login(); // Logs in
+
+            mucManager = MultiUserChatManager.getInstanceFor(connection);
+            mucManager.setAutoJoinOnReconnect(true);
+            mucManager.setAutoJoinSuccessCallback(new AutoJoinSuccessCallback() {
+
+                @Override
+                public void autoJoinSuccess(MultiUserChat muc, Resourcepart nickname) {
+                    getLogger().info("Autojoin to {} succeeded", muc.getRoom().asEntityBareJidString());
+                }
+            });
+        } catch (XmppStringprepException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (SmackException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (XMPPException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } // Establishes a connection to the server
+
+        chatMgr = ChatManager.getInstanceFor(connection);
+    }
 }

+ 95 - 84
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/ConsumeXMPP.java

@@ -16,6 +16,7 @@
  */
 package fr.chickenkiller.nifi.processors.xmpp;
 
+import fr.chickenkiller.nifi.processors.xmpp.domain.IncomingMessage;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.time.ZonedDateTime;
@@ -26,7 +27,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -48,93 +48,104 @@ import org.jivesoftware.smack.packet.Message;
 import org.jivesoftware.smack.packet.Message.Body;
 import org.jxmpp.jid.EntityBareJid;
 
-import fr.chickenkiller.nifi.processors.xmpp.domain.IncomingMessage;
-
-@Tags({ "xmpp", "conversation", "chat.getMessage", "social media", "team", "text", "unstructured", "read" })
+@Tags({"xmpp", "conversation", "chat.getMessage", "social media", "team", "text", "unstructured", "read"})
 @CapabilityDescription("Consumes messages from a XMPP server")
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({ @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
-		@WritesAttribute(attribute = "xmpp.received.date", description = "Date when message was received") })
+@WritesAttributes({
+    @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
+    @WritesAttribute(attribute = "xmpp.received.date", description = "Date when message was received")
+})
 @SeeAlso(PublishXMPP.class)
 public class ConsumeXMPP extends AbstractXMPPProcessor {
 
-	public static final Relationship SUCCESS = new Relationship.Builder().name("success")
-			.description("Message received and successfully parsed").build();
-
-	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
-			.description("Problem when receiving message").build();
-
-	private Queue<IncomingMessage> messages = new ConcurrentLinkedDeque<>();
-
-	public ConsumeXMPP() {
-		super();
-	}
-
-	@Override
-	protected void init(final ProcessorInitializationContext context) {
-		super.init(context);
-		relationships.add(SUCCESS);
-		relationships.add(FAILURE);
-		relationships = Collections.unmodifiableSet(relationships);
-	}
-
-	@OnScheduled
-	public void onScheduled(final ProcessContext context) {
-		prepareCommunication(context);
-		chatMgr.addIncomingListener(new IncomingChatMessageListener() {
-			@Override
-			public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
-				messages.add(new IncomingMessage(from, message));
-			}
-		});
-	}
-
-	@Override
-	public void onTrigger(final ProcessContext context, final ProcessSession session) {
-
-		final long start = System.nanoTime();
-
-		if (messages.isEmpty()) {
-			context.yield();
-		} else {
-			while (!messages.isEmpty()) {
-				IncomingMessage m = messages.poll();
-
-				getLogger().info("Received {} messages from {}",
-						new Object[] { m.getMessage().getBodies().size(), m.getFrom().asEntityBareJidString() });
-
-				for (Body body : m.getMessage().getBodies()) {
-					FlowFile flowFile = session.create();
-
-					try (final OutputStream out = session.write(flowFile)) {
-
-						out.write(body.getMessage().getBytes());
-						out.close();
-						
-						final Map<String, String> attributes = new HashMap<>();
-						attributes.put("xmpp.sender.jid", m.getFrom().asEntityBareJidString());
-						attributes.put("xmpp.sending.date",
-								DateTimeFormatter.ISO_DATE_TIME.format(ZonedDateTime.now()));
-						attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-						
-						flowFile = session.putAllAttributes(flowFile, attributes);
-
-						final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-						session.getProvenanceReporter().receive(flowFile, "xmpp://" + context.getProperty(SERVER),
-								"Received XMPP message from " + m.getFrom().asEntityBareJidString(), millis);
-						getLogger().info("Successfully received {} from {}",
-								new Object[] { flowFile, m.getFrom().asEntityBareJidString() });
-						session.transfer(flowFile, SUCCESS);
-
-						session.commit();
-					} catch (final IOException e) {
-						getLogger().error("Failed to write out the message received", e);
-						session.remove(flowFile);
-						session.rollback();
-					}
-				}
-			}
-		}
-	}
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Message received and successfully parsed")
+            .build();
+
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Problem when receiving message")
+            .build();
+
+    private Queue<IncomingMessage> messages = new ConcurrentLinkedDeque<>();
+
+    public ConsumeXMPP() {
+        super();
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+        relationships.add(SUCCESS);
+        relationships.add(FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        prepareCommunication(context);
+        chatMgr.addIncomingListener(new IncomingChatMessageListener() {
+            @Override
+            public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
+                messages.add(new IncomingMessage(from, message));
+            }
+        });
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+
+        final long start = System.nanoTime();
+
+        if (messages.isEmpty()) {
+            context.yield();
+        } else {
+            while (!messages.isEmpty()) {
+                IncomingMessage m = messages.poll();
+
+                getLogger().info("Received {} messages from {}", new Object[] {
+                    m.getMessage().getBodies().size(), m.getFrom().asEntityBareJidString()
+                });
+
+                for (Body body : m.getMessage().getBodies()) {
+                    FlowFile flowFile = session.create();
+
+                    try (final OutputStream out = session.write(flowFile)) {
+
+                        out.write(body.getMessage().getBytes());
+                        out.close();
+
+                        final Map<String, String> attributes = new HashMap<>();
+                        attributes.put("xmpp.sender.jid", m.getFrom().asEntityBareJidString());
+                        attributes.put(
+                                "xmpp.sending.date", DateTimeFormatter.ISO_DATE_TIME.format(ZonedDateTime.now()));
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+                        flowFile = session.putAllAttributes(flowFile, attributes);
+
+                        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                        session.getProvenanceReporter()
+                                .receive(
+                                        flowFile,
+                                        "xmpp://" + context.getProperty(SERVER),
+                                        "Received XMPP message from "
+                                                + m.getFrom().asEntityBareJidString(),
+                                        millis);
+                        getLogger().info("Successfully received {} from {}", new Object[] {
+                            flowFile, m.getFrom().asEntityBareJidString()
+                        });
+                        session.transfer(flowFile, SUCCESS);
+
+                        session.commit();
+                    } catch (final IOException e) {
+                        getLogger().error("Failed to write out the message received", e);
+                        session.remove(flowFile);
+                        session.rollback();
+                    }
+                }
+            }
+        }
+    }
 }

+ 167 - 134
nifi-xmpp-processors/src/main/java/fr/chickenkiller/nifi/processors/xmpp/PublishXMPP.java

@@ -18,7 +18,6 @@ package fr.chickenkiller.nifi.processors.xmpp;
 
 import java.io.InputStream;
 import java.util.Collections;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -41,141 +40,175 @@ import org.jxmpp.jid.EntityBareJid;
 import org.jxmpp.jid.impl.JidCreate;
 import org.jxmpp.stringprep.XmppStringprepException;
 
-@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
+@Tags({"xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read"})
 @CapabilityDescription("Send messages to a XMPP server")
 @InputRequirement(Requirement.INPUT_ALLOWED)
-@ReadsAttributes({ @ReadsAttribute(attribute = "xmpp.recipient.jid", description = "Message recipient JID"), })
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "xmpp.recipient.jid", description = "Message recipient JID"),
+})
 public class PublishXMPP extends AbstractXMPPProcessor {
 
-	/** Default subject value */
-	public static String DEFAULT_SUBJECT = "Nifi message";
-
-	/** Default language */
-	public static String DEFAULT_LANGUAGE = "en";
-
-	public static final PropertyDescriptor RECIPIENT = new PropertyDescriptor.Builder().name("Recipient JID")
-			.description("User JID used by this processor to send messages to").required(true)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder().name("Message subject")
-			.description("Message subject that will be shown").required(false).defaultValue(PublishXMPP.DEFAULT_SUBJECT)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder().name("Language used for message")
-			.description("Language used in the message").required(true).defaultValue(PublishXMPP.DEFAULT_LANGUAGE)
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Message sent")
-			.build();
-
-	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
-			.description("Problem when sending message").build();
-
-	/** Recipient JID */
-	private EntityBareJid recipientJid;
-
-	public PublishXMPP() {
-		super();
-	}
-
-	@Override
-	protected void init(final ProcessorInitializationContext context) {
-		super.init(context);
-		descriptors.add(RECIPIENT);
-		descriptors = Collections.unmodifiableList(descriptors);
-
-		relationships.add(SUCCESS);
-		relationships.add(FAILURE);
-		relationships = Collections.unmodifiableSet(relationships);
-	}
-
-	@OnScheduled
-	public void onScheduled(final ProcessContext context) {
-		prepareCommunication(context);
-		// Build the recipient ID
-		try {
-			recipientJid = JidCreate.entityBareFrom(context.getProperty(RECIPIENT).toString());
-		} catch (XmppStringprepException e) {
-			getLogger().error("Impossible to build recepient Jid from {} property with value {}",
-					new Object[] { RECIPIENT.getName(), context.getProperty(RECIPIENT).toString() });
-		}
-	}
-
-	@Override
-	public void onTrigger(final ProcessContext context, final ProcessSession session) {
-
-		FlowFile flowFile = session.get();
-
-		if (flowFile == null) {
-			context.yield();
-			return;
-		}
-
-		Chat chat = chatMgr.chatWith(recipientJid);
-		getLogger().info("Ready to send to " + recipientJid.asEntityBareJidString());
-
-		MessageBuilder builder = StanzaBuilder.buildMessage();
-		getLogger().debug("MessageBuilder created");
-
-		getLogger().debug("Adding Subject");
-		String subject = context.getProperty(SUBJECT).getValue() != null ? context.getProperty(SUBJECT).getValue()
-				: DEFAULT_SUBJECT;
-		String language = context.getProperty(LANGUAGE).getValue() != null ? context.getProperty(LANGUAGE).getValue()
-				: DEFAULT_LANGUAGE;
-		builder.addSubject(language, subject);
-		builder.setSubject(subject);
-
-		// session.read(flowfile, new InputStreamCallback() {
-		//
-		// @Override
-		// public void process(InputStream in) throws IOException {
-		// ByteBuffer byteBufer = ByteBuffer.wrap(in.readAllBytes());
-		// CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBufer);
-		// builder.addBody("", charBuffer.toString());
-		// }
-		// });
-
-		getLogger().debug("Setting language");
-		builder.setLanguage(language);
-
-		getLogger().debug("Reading flowfile");
-
-		final String messageText;
-		final byte[] buffer = new byte[(int) flowFile.getSize()];
-		try (final InputStream in = session.read(flowFile)) {
-			// Reading the content
-			in.read(buffer);
-			in.close();
-
-			messageText = new String(buffer);
-
-			builder.addBody(language, messageText);
-
-			getLogger().debug("Subject set to \"{}\" with language {}, ready to send", builder.getSubject(),
-					builder.getLanguage());
-			chat.send(builder.build());
-			final long sent = System.nanoTime();
-			session.getProvenanceReporter().send(flowFile, "xmpp://" + context.getProperty(SERVER),
-					"Sent XMPP message to " + recipientJid.asEntityBareJidString(), sent);
-			getLogger().info("Successfully send {} to {}",
-					new Object[] { flowFile, recipientJid.asEntityBareJidString() });
-			session.transfer(flowFile, SUCCESS);
-			getLogger().debug("Flowfile {} sent to success", flowFile);
-		} catch (NotConnectedException e) {
-			getLogger().error("Message to {} impossible to send, not connected: {}",
-					recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
-			session.penalize(flowFile);
-			session.transfer(flowFile, FAILURE);
-			session.rollback();
-		} catch (InterruptedException e) {
-			getLogger().error("Message to {} impossible to send, interrupted: {}", recipientJid.asEntityBareJidString(),
-					e.getLocalizedMessage());
-			session.penalize(flowFile);
-			session.transfer(flowFile, FAILURE);
-			session.rollback();
-		} catch (Exception e) {
-			getLogger().error("Got an exception", e);
-			session.rollback();
-		}
-	}
+    /** Default subject value */
+    public static String DEFAULT_SUBJECT = "Nifi message";
+
+    /** Default language */
+    public static String DEFAULT_LANGUAGE = "en";
+
+    public static final PropertyDescriptor RECIPIENT = new PropertyDescriptor.Builder()
+            .name("Recipient JID")
+            .description("User JID used by this processor to send messages to")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
+            .name("Message subject")
+            .description("Message subject that will be shown")
+            .required(false)
+            .defaultValue(PublishXMPP.DEFAULT_SUBJECT)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
+            .name("Language used for message")
+            .description("Language used in the message")
+            .required(true)
+            .defaultValue(PublishXMPP.DEFAULT_LANGUAGE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Message sent")
+            .build();
+
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Problem when sending message")
+            .build();
+
+    /** Recipient JID */
+    private EntityBareJid recipientJid;
+
+    public PublishXMPP() {
+        super();
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+        descriptors.add(RECIPIENT);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships.add(SUCCESS);
+        relationships.add(FAILURE);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        prepareCommunication(context);
+        // Build the recipient ID
+        try {
+            recipientJid =
+                    JidCreate.entityBareFrom(context.getProperty(RECIPIENT).toString());
+        } catch (XmppStringprepException e) {
+            getLogger().error("Impossible to build recepient Jid from {} property with value {}", new Object[] {
+                RECIPIENT.getName(), context.getProperty(RECIPIENT).toString()
+            });
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+
+        FlowFile flowFile = session.get();
+
+        if (flowFile == null) {
+            context.yield();
+            return;
+        }
+
+        Chat chat = chatMgr.chatWith(recipientJid);
+        getLogger().info("Ready to send to " + recipientJid.asEntityBareJidString());
+
+        MessageBuilder builder = StanzaBuilder.buildMessage();
+        getLogger().debug("MessageBuilder created");
+
+        getLogger().debug("Adding Subject");
+        String subject = context.getProperty(SUBJECT).getValue() != null
+                ? context.getProperty(SUBJECT).getValue()
+                : DEFAULT_SUBJECT;
+        String language = context.getProperty(LANGUAGE).getValue() != null
+                ? context.getProperty(LANGUAGE).getValue()
+                : DEFAULT_LANGUAGE;
+        builder.addSubject(language, subject);
+        builder.setSubject(subject);
+
+        // session.read(flowfile, new InputStreamCallback() {
+        //
+        // @Override
+        // public void process(InputStream in) throws IOException {
+        // ByteBuffer byteBufer = ByteBuffer.wrap(in.readAllBytes());
+        // CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBufer);
+        // builder.addBody("", charBuffer.toString());
+        // }
+        // });
+
+        getLogger().debug("Setting language");
+        builder.setLanguage(language);
+
+        getLogger().debug("Reading flowfile");
+
+        final String messageText;
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        try (final InputStream in = session.read(flowFile)) {
+            // Reading the content
+            in.read(buffer);
+            in.close();
+
+            messageText = new String(buffer);
+
+            builder.addBody(language, messageText);
+
+            getLogger()
+                    .debug(
+                            "Subject set to \"{}\" with language {}, ready to send",
+                            builder.getSubject(),
+                            builder.getLanguage());
+            chat.send(builder.build());
+            final long sent = System.nanoTime();
+            session.getProvenanceReporter()
+                    .send(
+                            flowFile,
+                            "xmpp://" + context.getProperty(SERVER),
+                            "Sent XMPP message to " + recipientJid.asEntityBareJidString(),
+                            sent);
+            getLogger()
+                    .info("Successfully send {} to {}", new Object[] {flowFile, recipientJid.asEntityBareJidString()});
+            session.transfer(flowFile, SUCCESS);
+            getLogger().debug("Flowfile {} sent to success", flowFile);
+        } catch (NotConnectedException e) {
+            getLogger()
+                    .error(
+                            "Message to {} impossible to send, not connected: {}",
+                            recipientJid.asEntityBareJidString(),
+                            e.getLocalizedMessage());
+            session.penalize(flowFile);
+            session.transfer(flowFile, FAILURE);
+            session.rollback();
+        } catch (InterruptedException e) {
+            getLogger()
+                    .error(
+                            "Message to {} impossible to send, interrupted: {}",
+                            recipientJid.asEntityBareJidString(),
+                            e.getLocalizedMessage());
+            session.penalize(flowFile);
+            session.transfer(flowFile, FAILURE);
+            session.rollback();
+        } catch (Exception e) {
+            getLogger().error("Got an exception", e);
+            session.rollback();
+        }
+    }
 }