|  | @@ -16,49 +16,48 @@
 | 
											
												
													
														|  |   */
 |  |   */
 | 
											
												
													
														|  |  package fr.chickenkiller.nifi.processors.xmpp;
 |  |  package fr.chickenkiller.nifi.processors.xmpp;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -import java.util.ArrayList;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import java.io.IOException;
 | 
											
												
													
														|  | 
 |  | +import java.io.OutputStream;
 | 
											
												
													
														|  | 
 |  | +import java.time.ZonedDateTime;
 | 
											
												
													
														|  | 
 |  | +import java.time.format.DateTimeFormatter;
 | 
											
												
													
														|  |  import java.util.Collections;
 |  |  import java.util.Collections;
 | 
											
												
													
														|  | -import java.util.HashSet;
 |  | 
 | 
											
												
													
														|  | -import java.util.List;
 |  | 
 | 
											
												
													
														|  | -import java.util.Set;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import java.util.HashMap;
 | 
											
												
													
														|  | 
 |  | +import java.util.Map;
 | 
											
												
													
														|  | 
 |  | +import java.util.Queue;
 | 
											
												
													
														|  | 
 |  | +import java.util.concurrent.ConcurrentLinkedDeque;
 | 
											
												
													
														|  | 
 |  | +import java.util.concurrent.TimeUnit;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.behavior.InputRequirement;
 |  |  import org.apache.nifi.annotation.behavior.InputRequirement;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 |  |  import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 | 
											
												
													
														|  | 
 |  | +import org.apache.nifi.annotation.behavior.TriggerSerially;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.behavior.WritesAttribute;
 |  |  import org.apache.nifi.annotation.behavior.WritesAttribute;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.behavior.WritesAttributes;
 |  |  import org.apache.nifi.annotation.behavior.WritesAttributes;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.documentation.CapabilityDescription;
 |  |  import org.apache.nifi.annotation.documentation.CapabilityDescription;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.documentation.SeeAlso;
 |  |  import org.apache.nifi.annotation.documentation.SeeAlso;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.documentation.Tags;
 |  |  import org.apache.nifi.annotation.documentation.Tags;
 | 
											
												
													
														|  |  import org.apache.nifi.annotation.lifecycle.OnScheduled;
 |  |  import org.apache.nifi.annotation.lifecycle.OnScheduled;
 | 
											
												
													
														|  | -import org.apache.nifi.components.PropertyDescriptor;
 |  | 
 | 
											
												
													
														|  |  import org.apache.nifi.flowfile.FlowFile;
 |  |  import org.apache.nifi.flowfile.FlowFile;
 | 
											
												
													
														|  | -import org.apache.nifi.processor.AbstractProcessor;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import org.apache.nifi.flowfile.attributes.CoreAttributes;
 | 
											
												
													
														|  |  import org.apache.nifi.processor.ProcessContext;
 |  |  import org.apache.nifi.processor.ProcessContext;
 | 
											
												
													
														|  |  import org.apache.nifi.processor.ProcessSession;
 |  |  import org.apache.nifi.processor.ProcessSession;
 | 
											
												
													
														|  |  import org.apache.nifi.processor.ProcessorInitializationContext;
 |  |  import org.apache.nifi.processor.ProcessorInitializationContext;
 | 
											
												
													
														|  |  import org.apache.nifi.processor.Relationship;
 |  |  import org.apache.nifi.processor.Relationship;
 | 
											
												
													
														|  | -import org.apache.nifi.processor.util.StandardValidators;
 |  | 
 | 
											
												
													
														|  | 
 |  | +import org.jivesoftware.smack.chat2.Chat;
 | 
											
												
													
														|  | 
 |  | +import org.jivesoftware.smack.chat2.IncomingChatMessageListener;
 | 
											
												
													
														|  | 
 |  | +import org.jivesoftware.smack.packet.Message;
 | 
											
												
													
														|  | 
 |  | +import org.jivesoftware.smack.packet.Message.Body;
 | 
											
												
													
														|  | 
 |  | +import org.jxmpp.jid.EntityBareJid;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
 |  | 
 | 
											
												
													
														|  | 
 |  | +import fr.chickenkiller.nifi.processors.xmpp.domain.IncomingMessage;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +@Tags({ "xmpp", "conversation", "chat.getMessage", "social media", "team", "text", "unstructured", "read" })
 | 
											
												
													
														|  |  @CapabilityDescription("Consumes messages from a XMPP server")
 |  |  @CapabilityDescription("Consumes messages from a XMPP server")
 | 
											
												
													
														|  | -@SeeAlso({})
 |  | 
 | 
											
												
													
														|  | 
 |  | +@TriggerSerially
 | 
											
												
													
														|  |  @InputRequirement(Requirement.INPUT_FORBIDDEN)
 |  |  @InputRequirement(Requirement.INPUT_FORBIDDEN)
 | 
											
												
													
														|  |  @WritesAttributes({ @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
 |  |  @WritesAttributes({ @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
 | 
											
												
													
														|  | -		@WritesAttribute(attribute = "xmpp.sending.date", description = "Sending date of the message") })
 |  | 
 | 
											
												
													
														|  | -public class ConsumeXMPP extends AbstractProcessor {
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -	public static final PropertyDescriptor SERVER = new PropertyDescriptor.Builder().name("XMPP Server")
 |  | 
 | 
											
												
													
														|  | -			.description("jabber.org").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -	public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("PORT")
 |  | 
 | 
											
												
													
														|  | -			.displayName("XMPP Server port").description("5222").defaultValue("5222")
 |  | 
 | 
											
												
													
														|  | -			.addValidator(StandardValidators.PORT_VALIDATOR).build();
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -	public static final PropertyDescriptor JID = new PropertyDescriptor.Builder().name("Receiving JID").required(true)
 |  | 
 | 
											
												
													
														|  | -			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -	public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("JID Password")
 |  | 
 | 
											
												
													
														|  | -			.required(true).sensitive(true).build();
 |  | 
 | 
											
												
													
														|  | 
 |  | +		@WritesAttribute(attribute = "xmpp.received.date", description = "Date when message was received") })
 | 
											
												
													
														|  | 
 |  | +@SeeAlso(PublishXMPP.class)
 | 
											
												
													
														|  | 
 |  | +public class ConsumeXMPP extends AbstractXMPPProcessor {
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  	public static final Relationship SUCCESS = new Relationship.Builder().name("success")
 |  |  	public static final Relationship SUCCESS = new Relationship.Builder().name("success")
 | 
											
												
													
														|  |  			.description("Message received and successfully parsed").build();
 |  |  			.description("Message received and successfully parsed").build();
 | 
											
										
											
												
													
														|  | @@ -66,46 +65,76 @@ public class ConsumeXMPP extends AbstractProcessor {
 | 
											
												
													
														|  |  	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
 |  |  	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
 | 
											
												
													
														|  |  			.description("Problem when receiving message").build();
 |  |  			.description("Problem when receiving message").build();
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -	private List<PropertyDescriptor> descriptors;
 |  | 
 | 
											
												
													
														|  | 
 |  | +	private Queue<IncomingMessage> messages = new ConcurrentLinkedDeque<>();
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -	private Set<Relationship> relationships;
 |  | 
 | 
											
												
													
														|  | 
 |  | +	public ConsumeXMPP() {
 | 
											
												
													
														|  | 
 |  | +		super();
 | 
											
												
													
														|  | 
 |  | +	}
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  	@Override
 |  |  	@Override
 | 
											
												
													
														|  |  	protected void init(final ProcessorInitializationContext context) {
 |  |  	protected void init(final ProcessorInitializationContext context) {
 | 
											
												
													
														|  | -		descriptors = new ArrayList<>();
 |  | 
 | 
											
												
													
														|  | -		descriptors.add(SERVER);
 |  | 
 | 
											
												
													
														|  | -		descriptors.add(PORT);
 |  | 
 | 
											
												
													
														|  | -		descriptors.add(JID);
 |  | 
 | 
											
												
													
														|  | -		descriptors.add(PASSWORD);
 |  | 
 | 
											
												
													
														|  | -		descriptors = Collections.unmodifiableList(descriptors);
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -		relationships = new HashSet<>();
 |  | 
 | 
											
												
													
														|  | 
 |  | +		super.init(context);
 | 
											
												
													
														|  |  		relationships.add(SUCCESS);
 |  |  		relationships.add(SUCCESS);
 | 
											
												
													
														|  |  		relationships.add(FAILURE);
 |  |  		relationships.add(FAILURE);
 | 
											
												
													
														|  |  		relationships = Collections.unmodifiableSet(relationships);
 |  |  		relationships = Collections.unmodifiableSet(relationships);
 | 
											
												
													
														|  |  	}
 |  |  	}
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -	@Override
 |  | 
 | 
											
												
													
														|  | -	public Set<Relationship> getRelationships() {
 |  | 
 | 
											
												
													
														|  | -		return this.relationships;
 |  | 
 | 
											
												
													
														|  | -	}
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -	@Override
 |  | 
 | 
											
												
													
														|  | -	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
 |  | 
 | 
											
												
													
														|  | -		return descriptors;
 |  | 
 | 
											
												
													
														|  | -	}
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  |  	@OnScheduled
 |  |  	@OnScheduled
 | 
											
												
													
														|  |  	public void onScheduled(final ProcessContext context) {
 |  |  	public void onScheduled(final ProcessContext context) {
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | 
 |  | +		prepareCommunication(context);
 | 
											
												
													
														|  | 
 |  | +		chatMgr.addIncomingListener(new IncomingChatMessageListener() {
 | 
											
												
													
														|  | 
 |  | +			@Override
 | 
											
												
													
														|  | 
 |  | +			public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
 | 
											
												
													
														|  | 
 |  | +				messages.add(new IncomingMessage(from, message));
 | 
											
												
													
														|  | 
 |  | +			}
 | 
											
												
													
														|  | 
 |  | +		});
 | 
											
												
													
														|  |  	}
 |  |  	}
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  	@Override
 |  |  	@Override
 | 
											
												
													
														|  |  	public void onTrigger(final ProcessContext context, final ProcessSession session) {
 |  |  	public void onTrigger(final ProcessContext context, final ProcessSession session) {
 | 
											
												
													
														|  | -		FlowFile flowFile = session.get();
 |  | 
 | 
											
												
													
														|  | -		if (flowFile == null) {
 |  | 
 | 
											
												
													
														|  | -			return;
 |  | 
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +		final long start = System.nanoTime();
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +		if (messages.isEmpty()) {
 | 
											
												
													
														|  | 
 |  | +			context.yield();
 | 
											
												
													
														|  | 
 |  | +		} else {
 | 
											
												
													
														|  | 
 |  | +			while (!messages.isEmpty()) {
 | 
											
												
													
														|  | 
 |  | +				IncomingMessage m = messages.poll();
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +				getLogger().info("Received {} messages from {}",
 | 
											
												
													
														|  | 
 |  | +						new Object[] { m.getMessage().getBodies().size(), m.getFrom().asEntityBareJidString() });
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +				for (Body body : m.getMessage().getBodies()) {
 | 
											
												
													
														|  | 
 |  | +					FlowFile flowFile = session.create();
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +					try (final OutputStream out = session.write(flowFile)) {
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +						out.write(body.getMessage().getBytes());
 | 
											
												
													
														|  | 
 |  | +						out.close();
 | 
											
												
													
														|  | 
 |  | +						
 | 
											
												
													
														|  | 
 |  | +						final Map<String, String> attributes = new HashMap<>();
 | 
											
												
													
														|  | 
 |  | +						attributes.put("xmpp.sender.jid", m.getFrom().asEntityBareJidString());
 | 
											
												
													
														|  | 
 |  | +						attributes.put("xmpp.sending.date",
 | 
											
												
													
														|  | 
 |  | +								DateTimeFormatter.ISO_DATE_TIME.format(ZonedDateTime.now()));
 | 
											
												
													
														|  | 
 |  | +						attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 | 
											
												
													
														|  | 
 |  | +						
 | 
											
												
													
														|  | 
 |  | +						flowFile = session.putAllAttributes(flowFile, attributes);
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +						final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 | 
											
												
													
														|  | 
 |  | +						session.getProvenanceReporter().receive(flowFile, "xmpp://" + context.getProperty(SERVER),
 | 
											
												
													
														|  | 
 |  | +								"Received XMPP message from " + m.getFrom().asEntityBareJidString(), millis);
 | 
											
												
													
														|  | 
 |  | +						getLogger().info("Successfully received {} from {}",
 | 
											
												
													
														|  | 
 |  | +								new Object[] { flowFile, m.getFrom().asEntityBareJidString() });
 | 
											
												
													
														|  | 
 |  | +						session.transfer(flowFile, SUCCESS);
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +						session.commit();
 | 
											
												
													
														|  | 
 |  | +					} catch (final IOException e) {
 | 
											
												
													
														|  | 
 |  | +						getLogger().error("Failed to write out the message received", e);
 | 
											
												
													
														|  | 
 |  | +						session.remove(flowFile);
 | 
											
												
													
														|  | 
 |  | +						session.rollback();
 | 
											
												
													
														|  | 
 |  | +					}
 | 
											
												
													
														|  | 
 |  | +				}
 | 
											
												
													
														|  | 
 |  | +			}
 | 
											
												
													
														|  |  		}
 |  |  		}
 | 
											
												
													
														|  | -		session.transfer(flowFile, SUCCESS);
 |  | 
 | 
											
												
													
														|  |  	}
 |  |  	}
 | 
											
												
													
														|  |  }
 |  |  }
 |