|
@@ -16,49 +16,48 @@
|
|
*/
|
|
*/
|
|
package fr.chickenkiller.nifi.processors.xmpp;
|
|
package fr.chickenkiller.nifi.processors.xmpp;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.io.OutputStream;
|
|
|
|
+import java.time.ZonedDateTime;
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
-import java.util.HashSet;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Set;
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Queue;
|
|
|
|
+import java.util.concurrent.ConcurrentLinkedDeque;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|
|
|
+import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
|
import org.apache.nifi.annotation.documentation.Tags;
|
|
import org.apache.nifi.annotation.documentation.Tags;
|
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
|
-import org.apache.nifi.components.PropertyDescriptor;
|
|
|
|
import org.apache.nifi.flowfile.FlowFile;
|
|
import org.apache.nifi.flowfile.FlowFile;
|
|
-import org.apache.nifi.processor.AbstractProcessor;
|
|
|
|
|
|
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
|
import org.apache.nifi.processor.ProcessContext;
|
|
import org.apache.nifi.processor.ProcessContext;
|
|
import org.apache.nifi.processor.ProcessSession;
|
|
import org.apache.nifi.processor.ProcessSession;
|
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
|
import org.apache.nifi.processor.Relationship;
|
|
import org.apache.nifi.processor.Relationship;
|
|
-import org.apache.nifi.processor.util.StandardValidators;
|
|
|
|
|
|
+import org.jivesoftware.smack.chat2.Chat;
|
|
|
|
+import org.jivesoftware.smack.chat2.IncomingChatMessageListener;
|
|
|
|
+import org.jivesoftware.smack.packet.Message;
|
|
|
|
+import org.jivesoftware.smack.packet.Message.Body;
|
|
|
|
+import org.jxmpp.jid.EntityBareJid;
|
|
|
|
|
|
-@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
|
|
|
|
|
|
+import fr.chickenkiller.nifi.processors.xmpp.domain.IncomingMessage;
|
|
|
|
+
|
|
|
|
+@Tags({ "xmpp", "conversation", "chat.getMessage", "social media", "team", "text", "unstructured", "read" })
|
|
@CapabilityDescription("Consumes messages from a XMPP server")
|
|
@CapabilityDescription("Consumes messages from a XMPP server")
|
|
-@SeeAlso({})
|
|
|
|
|
|
+@TriggerSerially
|
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
|
@WritesAttributes({ @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
|
|
@WritesAttributes({ @WritesAttribute(attribute = "xmpp.sender.jid", description = "Message Sender JID"),
|
|
- @WritesAttribute(attribute = "xmpp.sending.date", description = "Sending date of the message") })
|
|
|
|
-public class ConsumeXMPP extends AbstractProcessor {
|
|
|
|
-
|
|
|
|
- public static final PropertyDescriptor SERVER = new PropertyDescriptor.Builder().name("XMPP Server")
|
|
|
|
- .description("jabber.org").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
|
|
|
-
|
|
|
|
- public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("PORT")
|
|
|
|
- .displayName("XMPP Server port").description("5222").defaultValue("5222")
|
|
|
|
- .addValidator(StandardValidators.PORT_VALIDATOR).build();
|
|
|
|
-
|
|
|
|
- public static final PropertyDescriptor JID = new PropertyDescriptor.Builder().name("Receiving JID").required(true)
|
|
|
|
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
|
|
|
-
|
|
|
|
- public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("JID Password")
|
|
|
|
- .required(true).sensitive(true).build();
|
|
|
|
|
|
+ @WritesAttribute(attribute = "xmpp.received.date", description = "Date when message was received") })
|
|
|
|
+@SeeAlso(PublishXMPP.class)
|
|
|
|
+public class ConsumeXMPP extends AbstractXMPPProcessor {
|
|
|
|
|
|
public static final Relationship SUCCESS = new Relationship.Builder().name("success")
|
|
public static final Relationship SUCCESS = new Relationship.Builder().name("success")
|
|
.description("Message received and successfully parsed").build();
|
|
.description("Message received and successfully parsed").build();
|
|
@@ -66,46 +65,76 @@ public class ConsumeXMPP extends AbstractProcessor {
|
|
public static final Relationship FAILURE = new Relationship.Builder().name("failure")
|
|
public static final Relationship FAILURE = new Relationship.Builder().name("failure")
|
|
.description("Problem when receiving message").build();
|
|
.description("Problem when receiving message").build();
|
|
|
|
|
|
- private List<PropertyDescriptor> descriptors;
|
|
|
|
|
|
+ private Queue<IncomingMessage> messages = new ConcurrentLinkedDeque<>();
|
|
|
|
|
|
- private Set<Relationship> relationships;
|
|
|
|
|
|
+ public ConsumeXMPP() {
|
|
|
|
+ super();
|
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void init(final ProcessorInitializationContext context) {
|
|
protected void init(final ProcessorInitializationContext context) {
|
|
- descriptors = new ArrayList<>();
|
|
|
|
- descriptors.add(SERVER);
|
|
|
|
- descriptors.add(PORT);
|
|
|
|
- descriptors.add(JID);
|
|
|
|
- descriptors.add(PASSWORD);
|
|
|
|
- descriptors = Collections.unmodifiableList(descriptors);
|
|
|
|
-
|
|
|
|
- relationships = new HashSet<>();
|
|
|
|
|
|
+ super.init(context);
|
|
relationships.add(SUCCESS);
|
|
relationships.add(SUCCESS);
|
|
relationships.add(FAILURE);
|
|
relationships.add(FAILURE);
|
|
relationships = Collections.unmodifiableSet(relationships);
|
|
relationships = Collections.unmodifiableSet(relationships);
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public Set<Relationship> getRelationships() {
|
|
|
|
- return this.relationships;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
|
|
|
- return descriptors;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@OnScheduled
|
|
@OnScheduled
|
|
public void onScheduled(final ProcessContext context) {
|
|
public void onScheduled(final ProcessContext context) {
|
|
-
|
|
|
|
|
|
+ prepareCommunication(context);
|
|
|
|
+ chatMgr.addIncomingListener(new IncomingChatMessageListener() {
|
|
|
|
+ @Override
|
|
|
|
+ public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
|
|
|
|
+ messages.add(new IncomingMessage(from, message));
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
|
- FlowFile flowFile = session.get();
|
|
|
|
- if (flowFile == null) {
|
|
|
|
- return;
|
|
|
|
|
|
+
|
|
|
|
+ final long start = System.nanoTime();
|
|
|
|
+
|
|
|
|
+ if (messages.isEmpty()) {
|
|
|
|
+ context.yield();
|
|
|
|
+ } else {
|
|
|
|
+ while (!messages.isEmpty()) {
|
|
|
|
+ IncomingMessage m = messages.poll();
|
|
|
|
+
|
|
|
|
+ getLogger().info("Received {} messages from {}",
|
|
|
|
+ new Object[] { m.getMessage().getBodies().size(), m.getFrom().asEntityBareJidString() });
|
|
|
|
+
|
|
|
|
+ for (Body body : m.getMessage().getBodies()) {
|
|
|
|
+ FlowFile flowFile = session.create();
|
|
|
|
+
|
|
|
|
+ try (final OutputStream out = session.write(flowFile)) {
|
|
|
|
+
|
|
|
|
+ out.write(body.getMessage().getBytes());
|
|
|
|
+ out.close();
|
|
|
|
+
|
|
|
|
+ final Map<String, String> attributes = new HashMap<>();
|
|
|
|
+ attributes.put("xmpp.sender.jid", m.getFrom().asEntityBareJidString());
|
|
|
|
+ attributes.put("xmpp.sending.date",
|
|
|
|
+ DateTimeFormatter.ISO_DATE_TIME.format(ZonedDateTime.now()));
|
|
|
|
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
|
|
|
|
+
|
|
|
|
+ flowFile = session.putAllAttributes(flowFile, attributes);
|
|
|
|
+
|
|
|
|
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
|
+ session.getProvenanceReporter().receive(flowFile, "xmpp://" + context.getProperty(SERVER),
|
|
|
|
+ "Received XMPP message from " + m.getFrom().asEntityBareJidString(), millis);
|
|
|
|
+ getLogger().info("Successfully received {} from {}",
|
|
|
|
+ new Object[] { flowFile, m.getFrom().asEntityBareJidString() });
|
|
|
|
+ session.transfer(flowFile, SUCCESS);
|
|
|
|
+
|
|
|
|
+ session.commit();
|
|
|
|
+ } catch (final IOException e) {
|
|
|
|
+ getLogger().error("Failed to write out the message received", e);
|
|
|
|
+ session.remove(flowFile);
|
|
|
|
+ session.rollback();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- session.transfer(flowFile, SUCCESS);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|