|  | @@ -0,0 +1,181 @@
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | + * Licensed to the Apache Software Foundation (ASF) under one or more
 | 
	
		
			
				|  |  | + * contributor license agreements.  See the NOTICE file distributed with
 | 
	
		
			
				|  |  | + * this work for additional information regarding copyright ownership.
 | 
	
		
			
				|  |  | + * The ASF licenses this file to You under the Apache License, Version 2.0
 | 
	
		
			
				|  |  | + * (the "License"); you may not use this file except in compliance with
 | 
	
		
			
				|  |  | + * the License.  You may obtain a copy of the License at
 | 
	
		
			
				|  |  | + *
 | 
	
		
			
				|  |  | + *     http://www.apache.org/licenses/LICENSE-2.0
 | 
	
		
			
				|  |  | + *
 | 
	
		
			
				|  |  | + * Unless required by applicable law or agreed to in writing, software
 | 
	
		
			
				|  |  | + * distributed under the License is distributed on an "AS IS" BASIS,
 | 
	
		
			
				|  |  | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
	
		
			
				|  |  | + * See the License for the specific language governing permissions and
 | 
	
		
			
				|  |  | + * limitations under the License.
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  | +package fr.chickenkiller.nifi.processors.xmpp;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import java.io.InputStream;
 | 
	
		
			
				|  |  | +import java.util.Collections;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.behavior.InputRequirement;
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.behavior.ReadsAttribute;
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.behavior.ReadsAttributes;
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.documentation.CapabilityDescription;
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.documentation.Tags;
 | 
	
		
			
				|  |  | +import org.apache.nifi.annotation.lifecycle.OnScheduled;
 | 
	
		
			
				|  |  | +import org.apache.nifi.components.PropertyDescriptor;
 | 
	
		
			
				|  |  | +import org.apache.nifi.flowfile.FlowFile;
 | 
	
		
			
				|  |  | +import org.apache.nifi.processor.ProcessContext;
 | 
	
		
			
				|  |  | +import org.apache.nifi.processor.ProcessSession;
 | 
	
		
			
				|  |  | +import org.apache.nifi.processor.ProcessorInitializationContext;
 | 
	
		
			
				|  |  | +import org.apache.nifi.processor.Relationship;
 | 
	
		
			
				|  |  | +import org.apache.nifi.processor.util.StandardValidators;
 | 
	
		
			
				|  |  | +import org.jivesoftware.smack.SmackException.NotConnectedException;
 | 
	
		
			
				|  |  | +import org.jivesoftware.smack.chat2.Chat;
 | 
	
		
			
				|  |  | +import org.jivesoftware.smack.packet.MessageBuilder;
 | 
	
		
			
				|  |  | +import org.jivesoftware.smack.packet.StanzaBuilder;
 | 
	
		
			
				|  |  | +import org.jxmpp.jid.EntityBareJid;
 | 
	
		
			
				|  |  | +import org.jxmpp.jid.impl.JidCreate;
 | 
	
		
			
				|  |  | +import org.jxmpp.stringprep.XmppStringprepException;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +@Tags({ "xmpp", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "read" })
 | 
	
		
			
				|  |  | +@CapabilityDescription("Send messages to a XMPP server")
 | 
	
		
			
				|  |  | +@InputRequirement(Requirement.INPUT_ALLOWED)
 | 
	
		
			
				|  |  | +@ReadsAttributes({ @ReadsAttribute(attribute = "xmpp.recipient.jid", description = "Message recipient JID"), })
 | 
	
		
			
				|  |  | +public class PublishXMPP extends AbstractXMPPProcessor {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	/** Default subject value */
 | 
	
		
			
				|  |  | +	public static String DEFAULT_SUBJECT = "Nifi message";
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	/** Default language */
 | 
	
		
			
				|  |  | +	public static String DEFAULT_LANGUAGE = "en";
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public static final PropertyDescriptor RECIPIENT = new PropertyDescriptor.Builder().name("Recipient JID")
 | 
	
		
			
				|  |  | +			.description("User JID used by this processor to send messages to").required(true)
 | 
	
		
			
				|  |  | +			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder().name("Message subject")
 | 
	
		
			
				|  |  | +			.description("Message subject that will be shown").required(false).defaultValue(PublishXMPP.DEFAULT_SUBJECT)
 | 
	
		
			
				|  |  | +			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder().name("Language used for message")
 | 
	
		
			
				|  |  | +			.description("Language used in the message").required(true).defaultValue(PublishXMPP.DEFAULT_LANGUAGE)
 | 
	
		
			
				|  |  | +			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Message sent")
 | 
	
		
			
				|  |  | +			.build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public static final Relationship FAILURE = new Relationship.Builder().name("failure")
 | 
	
		
			
				|  |  | +			.description("Problem when sending message").build();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	/** Recipient JID */
 | 
	
		
			
				|  |  | +	private EntityBareJid recipientJid;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public PublishXMPP() {
 | 
	
		
			
				|  |  | +		super();
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	@Override
 | 
	
		
			
				|  |  | +	protected void init(final ProcessorInitializationContext context) {
 | 
	
		
			
				|  |  | +		super.init(context);
 | 
	
		
			
				|  |  | +		descriptors.add(RECIPIENT);
 | 
	
		
			
				|  |  | +		descriptors = Collections.unmodifiableList(descriptors);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		relationships.add(SUCCESS);
 | 
	
		
			
				|  |  | +		relationships.add(FAILURE);
 | 
	
		
			
				|  |  | +		relationships = Collections.unmodifiableSet(relationships);
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	@OnScheduled
 | 
	
		
			
				|  |  | +	public void onScheduled(final ProcessContext context) {
 | 
	
		
			
				|  |  | +		prepareCommunication(context);
 | 
	
		
			
				|  |  | +		// Build the recipient ID
 | 
	
		
			
				|  |  | +		try {
 | 
	
		
			
				|  |  | +			recipientJid = JidCreate.entityBareFrom(context.getProperty(RECIPIENT).toString());
 | 
	
		
			
				|  |  | +		} catch (XmppStringprepException e) {
 | 
	
		
			
				|  |  | +			getLogger().error("Impossible to build recepient Jid from {} property with value {}",
 | 
	
		
			
				|  |  | +					new Object[] { RECIPIENT.getName(), context.getProperty(RECIPIENT).toString() });
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	@Override
 | 
	
		
			
				|  |  | +	public void onTrigger(final ProcessContext context, final ProcessSession session) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		FlowFile flowFile = session.get();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		if (flowFile == null) {
 | 
	
		
			
				|  |  | +			context.yield();
 | 
	
		
			
				|  |  | +			return;
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		Chat chat = chatMgr.chatWith(recipientJid);
 | 
	
		
			
				|  |  | +		getLogger().info("Ready to send to " + recipientJid.asEntityBareJidString());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		MessageBuilder builder = StanzaBuilder.buildMessage();
 | 
	
		
			
				|  |  | +		getLogger().debug("MessageBuilder created");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		getLogger().debug("Adding Subject");
 | 
	
		
			
				|  |  | +		String subject = context.getProperty(SUBJECT).getValue() != null ? context.getProperty(SUBJECT).getValue()
 | 
	
		
			
				|  |  | +				: DEFAULT_SUBJECT;
 | 
	
		
			
				|  |  | +		String language = context.getProperty(LANGUAGE).getValue() != null ? context.getProperty(LANGUAGE).getValue()
 | 
	
		
			
				|  |  | +				: DEFAULT_LANGUAGE;
 | 
	
		
			
				|  |  | +		builder.addSubject(language, subject);
 | 
	
		
			
				|  |  | +		builder.setSubject(subject);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		// session.read(flowfile, new InputStreamCallback() {
 | 
	
		
			
				|  |  | +		//
 | 
	
		
			
				|  |  | +		// @Override
 | 
	
		
			
				|  |  | +		// public void process(InputStream in) throws IOException {
 | 
	
		
			
				|  |  | +		// ByteBuffer byteBufer = ByteBuffer.wrap(in.readAllBytes());
 | 
	
		
			
				|  |  | +		// CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBufer);
 | 
	
		
			
				|  |  | +		// builder.addBody("", charBuffer.toString());
 | 
	
		
			
				|  |  | +		// }
 | 
	
		
			
				|  |  | +		// });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		getLogger().debug("Setting language");
 | 
	
		
			
				|  |  | +		builder.setLanguage(language);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		getLogger().debug("Reading flowfile");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		final String messageText;
 | 
	
		
			
				|  |  | +		final byte[] buffer = new byte[(int) flowFile.getSize()];
 | 
	
		
			
				|  |  | +		try (final InputStream in = session.read(flowFile)) {
 | 
	
		
			
				|  |  | +			// Reading the content
 | 
	
		
			
				|  |  | +			in.read(buffer);
 | 
	
		
			
				|  |  | +			in.close();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +			messageText = new String(buffer);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +			builder.addBody(language, messageText);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +			getLogger().debug("Subject set to \"{}\" with language {}, ready to send", builder.getSubject(),
 | 
	
		
			
				|  |  | +					builder.getLanguage());
 | 
	
		
			
				|  |  | +			chat.send(builder.build());
 | 
	
		
			
				|  |  | +			final long sent = System.nanoTime();
 | 
	
		
			
				|  |  | +			session.getProvenanceReporter().send(flowFile, "xmpp://" + context.getProperty(SERVER),
 | 
	
		
			
				|  |  | +					"Sent XMPP message to " + recipientJid.asEntityBareJidString(), sent);
 | 
	
		
			
				|  |  | +			getLogger().info("Successfully send {} to {}",
 | 
	
		
			
				|  |  | +					new Object[] { flowFile, recipientJid.asEntityBareJidString() });
 | 
	
		
			
				|  |  | +			session.transfer(flowFile, SUCCESS);
 | 
	
		
			
				|  |  | +			getLogger().debug("Flowfile {} sent to success", flowFile);
 | 
	
		
			
				|  |  | +		} catch (NotConnectedException e) {
 | 
	
		
			
				|  |  | +			getLogger().error("Message to {} impossible to send, not connected: {}",
 | 
	
		
			
				|  |  | +					recipientJid.asEntityBareJidString(), e.getLocalizedMessage());
 | 
	
		
			
				|  |  | +			session.penalize(flowFile);
 | 
	
		
			
				|  |  | +			session.transfer(flowFile, FAILURE);
 | 
	
		
			
				|  |  | +			session.rollback();
 | 
	
		
			
				|  |  | +		} catch (InterruptedException e) {
 | 
	
		
			
				|  |  | +			getLogger().error("Message to {} impossible to send, interrupted: {}", recipientJid.asEntityBareJidString(),
 | 
	
		
			
				|  |  | +					e.getLocalizedMessage());
 | 
	
		
			
				|  |  | +			session.penalize(flowFile);
 | 
	
		
			
				|  |  | +			session.transfer(flowFile, FAILURE);
 | 
	
		
			
				|  |  | +			session.rollback();
 | 
	
		
			
				|  |  | +		} catch (Exception e) {
 | 
	
		
			
				|  |  | +			getLogger().error("Got an exception", e);
 | 
	
		
			
				|  |  | +			session.rollback();
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +}
 |