Russell Bateman
November 2020
last update:
Controller services are a little more confusing to write and to use than custom processors your first time. I'm going to show setting this up (I am using NiFi 1.11.4). A good piece of documentation to look at also is Adding Controller Services for Dataflows. What I explain here is only a little different.
I have written two controllers for what I need—StandardAuthToken and CanonicalConnection. I used to have a different connection controller, but I got tire of configuring it to use the first which I also had to configure. So, I merged the functionality and now only need the one I show here.
This page tells you how to configure a controller service in NiFi even if you didn't write it.
Remember, once you set up a controller, you can use it in support of different, consuming processors if the configuration remains the same. You don't have to create a new instance of the controller for every processor using it.
Next, you're going to configure your processor's consumption of the controller service you just set up.
Note that, when implementing and deploying, that is, consuming a controller in a NiFi processor property, there are careful rules to follow and often these get overlooked. At all of the highlighted points below, you'll find interface IConnectionService being used.
package com.windofkeltia.controller; import org.apache.nifi.controller.ControllerService; public interface IConnectionService extends ControllerService { // connection-proper details... String getProtocol(); String getHostname(); String getPort(); int getTimeout(); boolean getDisableSsl(); String getDefaultAccept(); URL getServerUrl(); String getUsername(); String getToken(); }
package com.windofkeltia.controller; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; public class CanonicalConnection extends AbstractControllerService implements IConnectionService { private String protocol; @Override public String getProtocol() { return protocol; } private String hostname; @Override public String getHostname() { return hostname; } private String port; @Override public String getPort() { return port; } private int timeout; @Override public int getTimeout() { return timeout; } private Boolean disableSsl; @Override public boolean getDisableSsl() { return disableSsl; } private String useDefaultAccept; @Override public String getDefaultAccept() { return useDefaultAccept; } private URL serverUrl; @Override public URL getServerUrl() { return serverUrl; } private String username; @Override public String getUsername() { return username; } private String token; @Override public String getToken() { return token; } @OnEnabled public void onConfigured( final ConfigurationContext context ) throws InitializationException { protocol = context.getProperty( PROTOCOL_PROPERTY ).getValue().toLowerCase(); hostname = context.getProperty( HOSTNAME_PROPERTY ).getValue(); port = context.getProperty( HOSTPORT_PROPERTY ).getValue(); disableSsl = context.getProperty( DISABLE_SSL_PROPERTY ).asBoolean(); useDefaultAccept = context.getProperty( DEFAULT_ACCEPT ).getValue(); username = context.getProperty( USERNAME_PROPERTY ).getValue(); token = context.getProperty( TOKEN_PROPERTY ).getValue(); try { String timeoutString = context.getProperty( CONNECTION_TIMEOUT ).getValue(); int timeoutSeconds = Integer.parseInt( timeoutString ); if( timeoutSeconds < 0 ) throw new NumberFormatException( "Number is negative" ); timeout = timeoutSeconds * 1000; } catch( NumberFormatException e ) { throw new InitializationException( "Time-out value must be 0 or a positive number of seconds" ); } StringBuilder url = new StringBuilder( protocol ); url.append( "://" ).append( hostname ); if( !StringUtilities.isEmpty( port ) ) url.append( ':' ).append( port ); try { serverUrl = new URL( url.toString() ); } catch( MalformedURLException e ) { throw new InitializationException( String.format( "Host and port configuration created invalid URL '%s'", url ), e ); } } //<editor-fold desc="Individual property definitions"> //</editor-fold> private static final List< PropertyDescriptor > serviceProperties; static { final List< PropertyDescriptor > properties = new ArrayList<>(); properties.add( PROTOCOL_PROPERTY ); properties.add( HOSTNAME_PROPERTY ); properties.add( HOSTPORT_PROPERTY ); properties.add( DISABLE_SSL_PROPERTY ); properties.add( CONNECTION_TIMEOUT ); properties.add( DEFAULT_ACCEPT ); properties.add( USERNAME_PROPERTY ); properties.add( TOKEN_PROPERTY ); serviceProperties = Collections.unmodifiableList( properties ); } @Override final protected List< PropertyDescriptor > getSupportedPropertyDescriptors() { return serviceProperties; } }
First, the processor's configuration property that consumes the controller indicates the controller by its interface name. (I always do this at the end because once defined, I want them out of the way).
public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() .name( CONNECTION_SERVICE_NAME ) .displayName( CONNECTION_SERVICE_DNAME ) .description( "Controller service that connects to Middleware." ) .required( true ) .identifiesControllerService( IConnectionService.class ) .build(); . . . private List< PropertyDescriptor > properties; . . . @Override public void init( final ProcessorInitializationContext context ) { List< PropertyDescriptor > properties = new ArrayList<>(); properties.add( CONNECTION_SERVICE ); . . . this.properties = Collections.unmodifiableList( properties ); . . . }
package com.windofkeltia.controller; import com.windofkeltia.controller.interfaces.IConnectionService; public class ExtractHl7v4 extends AbstractProcessor { @Override public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException { final CanonicalConnection connectionService = context.getProperty( CONNECTION_SERVICE ) .asControllerService( IConnectionService.class ); String hostname = connectionService.getHostname(); ...etc. } . . .
Failure to do this will result in a controller server that cannot be found from the NiFi UI.
Something like this. It should get you close. I don't have the time to vet this fully as an example. The code I quickly scraped and modified to give this example worked. The important bits not to forget are highlighted. Maybe I'll get back here to do a proper job.
Obvious, the four lines of properties.put() below must match what your controller need as configuation.
package com.windofkeltia.controller; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import com.windofkeltia.controller.CanonicalConnection; import com.windofkeltia.controller.IConnectionService; public class ExtractHl7v4Test { @After public void tearDown() { } @Before public void setUp() { connectionService = new CanonicalConnection(); runner = TestRunners.newTestRunner( processor = new ExtractHl7v4() ); runner.setValidateExpressionUsage( false ); } private static final int ONE = 1; private static final String EXTRACT_FROM = "<Bundle xmlns="http://hl7.org/fhir">\n"; + " <type value="transaction" />\n" + " <entry>\n" + " <fullUrl value="urn:uuid:38826a3f-8e5c-4846-95c3-7f12a233447d" />\n" + " <resource>\n" + " <Patient xmlns="http://hl7.org/fhir">\n" + " ...\n" + " </Patient>\n" + " </resource>\n" + " </entry>\n" + "</Bundle>\n"; private ExtractHl7v4 processor; private IConnectionService connectionService; private TestRunner runner; @Test public void test() throws InitializationException { Map< String, String > properties = new HashMap<>(); properties.put( CanonicalImatConnection.PROTOCOL_PROPERTY.getName(), "https" ); properties.put( CanonicalImatConnection.HOSTNAME_PROPERTY.getName(), "10.10.11.192" ); properties.put( CanonicalImatConnection.USERNAME_PROPERTY.getName(), "rbateman" ); properties.put( CanonicalImatConnection.TOKEN_PROPERTY .getName(), "WPES9E49...VPMAZ" ); runner.addControllerService( "connectionService", connectionService, properties ); runner.enableControllerService( connectionService ); runner.setProperty( EmitSearchServer.CONNECTION_SERVICE, "connectionService" ); runner.enqueue( new ByteArrayInputStream( EXTRACT_FROM.getBytes() ) ); runner.run( ONE ); runner.assertQueueEmpty(); List< MockFlowFile > successes = runner.getFlowFilesForRelationship( ExtractHl7v4.SUCCESS ); MockFlowFile flowfile = successes.get( 0 ); ... } }
Here's a little simpler version to follow:
package com.windofkeltia.controller; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import com.windofkeltia.controller.CanonicalConnection; import com.windofkeltia.controller.IConnectionService; public class ExtractHl7v4Test { private static final int ONE = 1; private static final String EXTRACT_FROM = "<Bundle xmlns="http://hl7.org/fhir">\n"; + " <type value="transaction" />\n" + " <entry>\n" + " ...\n" + " </entry>\n" + "</Bundle>\n"; @Test public void test() throws InitializationException { TestRunner runner = TestRunners.newTestRunner( new ExtractHl7v4() ); runner.setValidateExpressionUsage( false ); IConnectionSerice connectionService = new CanonicalConnection(); Map< String, String > properties = new HashMap<>(); properties.put( CanonicalConnection.PROTOCOL_PROPERTY.getName(), "https" ); properties.put( CanonicalConnection.HOSTNAME_PROPERTY.getName(), "10.10.11.192" ); properties.put( CanonicalConnection.USERNAME_PROPERTY.getName(), "rbateman" ); properties.put( CanonicalConnection.TOKEN_PROPERTY .getName(), "WPES9E49...VPMAZ" ); runner.addControllerService( "connectionService", connectionService, properties ); runner.enableControllerService( connectionService ); runner.setProperty( ExtractHl7v4.CONNECTION_SERVICE, "connectionService" ); runner.enqueue( new ByteArrayInputStream( EXTRACT_FROM.getBytes() ) ); runner.run( ONE ); runner.assertQueueEmpty(); List< MockFlowFile > successes = runner.getFlowFilesForRelationship( ExtractHl7v4.SUCCESS ); MockFlowFile flowfile = successes.get( 0 ); ... } }