|
@@ -18,7 +18,6 @@ package fr.chickenkiller.nifi.processors.xmpp;
|
|
|
|
|
|
import java.io.InputStream;
|
|
|
import java.util.Collections;
|
|
|
-
|
|
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|
|
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
|
@@ -41,141 +40,175 @@ import org.jxmpp.jid.EntityBareJid;
|
|
|
import org.jxmpp.jid.impl.JidCreate;
|
|
|
import org.jxmpp.stringprep.XmppStringprepException;
|
|
|
|
|
|
-@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
|
|
|
+@Tags({"xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read"})
|
|
|
@CapabilityDescription("Send messages to a XMPP server")
|
|
|
@InputRequirement(Requirement.INPUT_ALLOWED)
|
|
|
-@ReadsAttributes({ @ReadsAttribute(attribute = "xmpp.recipient.jid", description = "Message recipient JID"), })
|
|
|
+@ReadsAttributes({
|
|
|
+ @ReadsAttribute(attribute = "xmpp.recipient.jid", description = "Message recipient JID"),
|
|
|
+})
|
|
|
public class PublishXMPP extends AbstractXMPPProcessor {
|
|
|
|
|
|
- /** Default subject value */
|
|
|
- public static String DEFAULT_SUBJECT = "Nifi message";
|
|
|
-
|
|
|
- /** Default language */
|
|
|
- public static String DEFAULT_LANGUAGE = "en";
|
|
|
-
|
|
|
- public static final PropertyDescriptor RECIPIENT = new PropertyDescriptor.Builder().name("Recipient JID")
|
|
|
- .description("User JID used by this processor to send messages to").required(true)
|
|
|
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
|
|
-
|
|
|
- public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder().name("Message subject")
|
|
|
- .description("Message subject that will be shown").required(false).defaultValue(PublishXMPP.DEFAULT_SUBJECT)
|
|
|
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
|
|
-
|
|
|
- public static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder().name("Language used for message")
|
|
|
- .description("Language used in the message").required(true).defaultValue(PublishXMPP.DEFAULT_LANGUAGE)
|
|
|
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
|
|
-
|
|
|
- public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Message sent")
|
|
|
- .build();
|
|
|
-
|
|
|
- public static final Relationship FAILURE = new Relationship.Builder().name("failure")
|
|
|
- .description("Problem when sending message").build();
|
|
|
-
|
|
|
- /** Recipient JID */
|
|
|
- private EntityBareJid recipientJid;
|
|
|
-
|
|
|
- public PublishXMPP() {
|
|
|
- super();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void init(final ProcessorInitializationContext context) {
|
|
|
- super.init(context);
|
|
|
- descriptors.add(RECIPIENT);
|
|
|
- descriptors = Collections.unmodifiableList(descriptors);
|
|
|
-
|
|
|
- relationships.add(SUCCESS);
|
|
|
- relationships.add(FAILURE);
|
|
|
- relationships = Collections.unmodifiableSet(relationships);
|
|
|
- }
|
|
|
-
|
|
|
- @OnScheduled
|
|
|
- public void onScheduled(final ProcessContext context) {
|
|
|
- prepareCommunication(context);
|
|
|
- // Build the recipient ID
|
|
|
- try {
|
|
|
- recipientJid = JidCreate.entityBareFrom(context.getProperty(RECIPIENT).toString());
|
|
|
- } catch (XmppStringprepException e) {
|
|
|
- getLogger().error("Impossible to build recepient Jid from {} property with value {}",
|
|
|
- new Object[] { RECIPIENT.getName(), context.getProperty(RECIPIENT).toString() });
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
|
|
-
|
|
|
- FlowFile flowFile = session.get();
|
|
|
-
|
|
|
- if (flowFile == null) {
|
|
|
- context.yield();
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- Chat chat = chatMgr.chatWith(recipientJid);
|
|
|
- getLogger().info("Ready to send to " + recipientJid.asEntityBareJidString());
|
|
|
-
|
|
|
- MessageBuilder builder = StanzaBuilder.buildMessage();
|
|
|
- getLogger().debug("MessageBuilder created");
|
|
|
-
|
|
|
- getLogger().debug("Adding Subject");
|
|
|
- String subject = context.getProperty(SUBJECT).getValue() != null ? context.getProperty(SUBJECT).getValue()
|
|
|
- : DEFAULT_SUBJECT;
|
|
|
- String language = context.getProperty(LANGUAGE).getValue() != null ? context.getProperty(LANGUAGE).getValue()
|
|
|
- : DEFAULT_LANGUAGE;
|
|
|
- builder.addSubject(language, subject);
|
|
|
- builder.setSubject(subject);
|
|
|
-
|
|
|
- // session.read(flowfile, new InputStreamCallback() {
|
|
|
- //
|
|
|
- // @Override
|
|
|
- // public void process(InputStream in) throws IOException {
|
|
|
- // ByteBuffer byteBufer = ByteBuffer.wrap(in.readAllBytes());
|
|
|
- // CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBufer);
|
|
|
- // builder.addBody("", charBuffer.toString());
|
|
|
- // }
|
|
|
- // });
|
|
|
-
|
|
|
- getLogger().debug("Setting language");
|
|
|
- builder.setLanguage(language);
|
|
|
-
|
|
|
- getLogger().debug("Reading flowfile");
|
|
|
-
|
|
|
- final String messageText;
|
|
|
- final byte[] buffer = new byte[(int) flowFile.getSize()];
|
|
|
- try (final InputStream in = session.read(flowFile)) {
|
|
|
- // Reading the content
|
|
|
- in.read(buffer);
|
|
|
- in.close();
|
|
|
-
|
|
|
- messageText = new String(buffer);
|
|
|
-
|
|
|
- builder.addBody(language, messageText);
|
|
|
-
|
|
|
- getLogger().debug("Subject set to \"{}\" with language {}, ready to send", builder.getSubject(),
|
|
|
- builder.getLanguage());
|
|
|
- chat.send(builder.build());
|
|
|
- final long sent = System.nanoTime();
|
|
|
- session.getProvenanceReporter().send(flowFile, "xmpp://" + context.getProperty(SERVER),
|
|
|
- "Sent XMPP message to " + recipientJid.asEntityBareJidString(), sent);
|
|
|
- getLogger().info("Successfully send {} to {}",
|
|
|
- new Object[] { flowFile, recipientJid.asEntityBareJidString() });
|
|
|
- session.transfer(flowFile, SUCCESS);
|
|
|
- getLogger().debug("Flowfile {} sent to success", flowFile);
|
|
|
- } catch (NotConnectedException e) {
|
|
|
- getLogger().error("Message to {} impossible to send, not connected: {}",
|
|
|
- recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
|
|
|
- session.penalize(flowFile);
|
|
|
- session.transfer(flowFile, FAILURE);
|
|
|
- session.rollback();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- getLogger().error("Message to {} impossible to send, interrupted: {}", recipientJid.asEntityBareJidString(),
|
|
|
- e.getLocalizedMessage());
|
|
|
- session.penalize(flowFile);
|
|
|
- session.transfer(flowFile, FAILURE);
|
|
|
- session.rollback();
|
|
|
- } catch (Exception e) {
|
|
|
- getLogger().error("Got an exception", e);
|
|
|
- session.rollback();
|
|
|
- }
|
|
|
- }
|
|
|
+ /** Default subject value */
|
|
|
+ public static String DEFAULT_SUBJECT = "Nifi message";
|
|
|
+
|
|
|
+ /** Default language */
|
|
|
+ public static String DEFAULT_LANGUAGE = "en";
|
|
|
+
|
|
|
+ public static final PropertyDescriptor RECIPIENT = new PropertyDescriptor.Builder()
|
|
|
+ .name("Recipient JID")
|
|
|
+ .description("User JID used by this processor to send messages to")
|
|
|
+ .required(true)
|
|
|
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
|
|
|
+ .name("Message subject")
|
|
|
+ .description("Message subject that will be shown")
|
|
|
+ .required(false)
|
|
|
+ .defaultValue(PublishXMPP.DEFAULT_SUBJECT)
|
|
|
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ public static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
|
|
|
+ .name("Language used for message")
|
|
|
+ .description("Language used in the message")
|
|
|
+ .required(true)
|
|
|
+ .defaultValue(PublishXMPP.DEFAULT_LANGUAGE)
|
|
|
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ public static final Relationship SUCCESS = new Relationship.Builder()
|
|
|
+ .name("success")
|
|
|
+ .description("Message sent")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ public static final Relationship FAILURE = new Relationship.Builder()
|
|
|
+ .name("failure")
|
|
|
+ .description("Problem when sending message")
|
|
|
+ .build();
|
|
|
+
|
|
|
+ /** Recipient JID */
|
|
|
+ private EntityBareJid recipientJid;
|
|
|
+
|
|
|
+ public PublishXMPP() {
|
|
|
+ super();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void init(final ProcessorInitializationContext context) {
|
|
|
+ super.init(context);
|
|
|
+ descriptors.add(RECIPIENT);
|
|
|
+ descriptors = Collections.unmodifiableList(descriptors);
|
|
|
+
|
|
|
+ relationships.add(SUCCESS);
|
|
|
+ relationships.add(FAILURE);
|
|
|
+ relationships = Collections.unmodifiableSet(relationships);
|
|
|
+ }
|
|
|
+
|
|
|
+ @OnScheduled
|
|
|
+ public void onScheduled(final ProcessContext context) {
|
|
|
+ prepareCommunication(context);
|
|
|
+ // Build the recipient ID
|
|
|
+ try {
|
|
|
+ recipientJid =
|
|
|
+ JidCreate.entityBareFrom(context.getProperty(RECIPIENT).toString());
|
|
|
+ } catch (XmppStringprepException e) {
|
|
|
+ getLogger().error("Impossible to build recepient Jid from {} property with value {}", new Object[] {
|
|
|
+ RECIPIENT.getName(), context.getProperty(RECIPIENT).toString()
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
|
|
+
|
|
|
+ FlowFile flowFile = session.get();
|
|
|
+
|
|
|
+ if (flowFile == null) {
|
|
|
+ context.yield();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Chat chat = chatMgr.chatWith(recipientJid);
|
|
|
+ getLogger().info("Ready to send to " + recipientJid.asEntityBareJidString());
|
|
|
+
|
|
|
+ MessageBuilder builder = StanzaBuilder.buildMessage();
|
|
|
+ getLogger().debug("MessageBuilder created");
|
|
|
+
|
|
|
+ getLogger().debug("Adding Subject");
|
|
|
+ String subject = context.getProperty(SUBJECT).getValue() != null
|
|
|
+ ? context.getProperty(SUBJECT).getValue()
|
|
|
+ : DEFAULT_SUBJECT;
|
|
|
+ String language = context.getProperty(LANGUAGE).getValue() != null
|
|
|
+ ? context.getProperty(LANGUAGE).getValue()
|
|
|
+ : DEFAULT_LANGUAGE;
|
|
|
+ builder.addSubject(language, subject);
|
|
|
+ builder.setSubject(subject);
|
|
|
+
|
|
|
+ // session.read(flowfile, new InputStreamCallback() {
|
|
|
+ //
|
|
|
+ // @Override
|
|
|
+ // public void process(InputStream in) throws IOException {
|
|
|
+ // ByteBuffer byteBufer = ByteBuffer.wrap(in.readAllBytes());
|
|
|
+ // CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBufer);
|
|
|
+ // builder.addBody("", charBuffer.toString());
|
|
|
+ // }
|
|
|
+ // });
|
|
|
+
|
|
|
+ getLogger().debug("Setting language");
|
|
|
+ builder.setLanguage(language);
|
|
|
+
|
|
|
+ getLogger().debug("Reading flowfile");
|
|
|
+
|
|
|
+ final String messageText;
|
|
|
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
|
|
|
+ try (final InputStream in = session.read(flowFile)) {
|
|
|
+ // Reading the content
|
|
|
+ in.read(buffer);
|
|
|
+ in.close();
|
|
|
+
|
|
|
+ messageText = new String(buffer);
|
|
|
+
|
|
|
+ builder.addBody(language, messageText);
|
|
|
+
|
|
|
+ getLogger()
|
|
|
+ .debug(
|
|
|
+ "Subject set to \"{}\" with language {}, ready to send",
|
|
|
+ builder.getSubject(),
|
|
|
+ builder.getLanguage());
|
|
|
+ chat.send(builder.build());
|
|
|
+ final long sent = System.nanoTime();
|
|
|
+ session.getProvenanceReporter()
|
|
|
+ .send(
|
|
|
+ flowFile,
|
|
|
+ "xmpp://" + context.getProperty(SERVER),
|
|
|
+ "Sent XMPP message to " + recipientJid.asEntityBareJidString(),
|
|
|
+ sent);
|
|
|
+ getLogger()
|
|
|
+ .info("Successfully send {} to {}", new Object[] {flowFile, recipientJid.asEntityBareJidString()});
|
|
|
+ session.transfer(flowFile, SUCCESS);
|
|
|
+ getLogger().debug("Flowfile {} sent to success", flowFile);
|
|
|
+ } catch (NotConnectedException e) {
|
|
|
+ getLogger()
|
|
|
+ .error(
|
|
|
+ "Message to {} impossible to send, not connected: {}",
|
|
|
+ recipientJid.asEntityBareJidString(),
|
|
|
+ e.getLocalizedMessage());
|
|
|
+ session.penalize(flowFile);
|
|
|
+ session.transfer(flowFile, FAILURE);
|
|
|
+ session.rollback();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ getLogger()
|
|
|
+ .error(
|
|
|
+ "Message to {} impossible to send, interrupted: {}",
|
|
|
+ recipientJid.asEntityBareJidString(),
|
|
|
+ e.getLocalizedMessage());
|
|
|
+ session.penalize(flowFile);
|
|
|
+ session.transfer(flowFile, FAILURE);
|
|
|
+ session.rollback();
|
|
|
+ } catch (Exception e) {
|
|
|
+ getLogger().error("Got an exception", e);
|
|
|
+ session.rollback();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|