An Apache Kafka and Apache NiFi ExperienceRussell Bateman |
This is an initial pump-priming exercise in using Apache NiFi to interact with Kafka. I'm using Apache NiFi 1.19.1 and Apache Kafka 2.13-3.3.2.
russ@tirion ~/dev/ $ mkdir kafka russ@tirion ~/dev/ $ cd kafka
russ@tirion ~/dev/kafka $ tar -zxf kafka_2.13-3.3.2.tgz russ@tirion ~/dev/kafka $ ll russ@tirion ~/dev/kafka $ rm *.tgz russ@tirion ~/dev/kafka $ java --version openjdk 11.0.17 2022-10-18 OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu220.04) OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu220.04, mixed mode, sharing)
russ@tirion ~/dev/kafka $ ln -s kafka_2.13-3.3.2 kafka russ@tirion ~/dev/kafka $ ll total 12 drwxrwxr-x 3 russ russ 4096 Jan 27 14:35 . drwxrwxr-x 59 russ russ 4096 Jan 27 14:32 .. lrwxrwxrwx 1 russ russ 16 Jan 27 14:35 kafka -> kafka_2.13-3.3.2 drwxr-xr-x 8 russ russ 4096 Jan 27 14:36 kafka_2.13-3.3.2
russ@tirion ~/dev/kafka $ ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
[2023-01-27 14:52:18,116] INFO Reading configuration from: ./kafka/config/zookeeper.properties \
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-01-27 14:52:18,121] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
russ@tirion ~/dev/kafka $ ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
[2023-01-27 14:54:02,744] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-01-27 14:54:02,970] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation \
(org.apache.zookeeper.common.X509Util)
[2023-01-27 14:54:03,031] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-01-27 14:54:03,032] INFO starting (kafka.server.KafkaServer)
[2023-01-27 14:54:03,032] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-01-27 14:54:03,041] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-01-27 14:54:03,045] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, \
built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
[2023-01-27 14:54:03,045] INFO Client environment:host.name=tirion (org.apache.zookeeper.ZooKeeper)
[2023-01-27 14:54:03,045] INFO Client environment:java.version=11.0.10 (org.apache.zookeeper.ZooKeeper)
[2023-01-27 14:54:03,045] INFO Client environment:java.vendor=AdoptOpenJDK (org.apache.zookeeper.ZooKeeper)
[2023-01-27 14:54:03,045] INFO Client environment:java.home=/home/russ/dev/jdk-11.0.10+9 (org.apache.zookeeper.ZooKeeper)
...
[2023-01-27 14:54:03,968] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, \
from now on will use node tirion:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
+------------------+ +------------------+ +------+ | GenerateFlowFile | -----→ | PublishKafka_2_6 | -----→ | NoOp | +------------------+ +------------------+ +------+
russ@tirion ~/dev/kafka/kafka $ ./bin/kafka-console-consumer.sh --topic FHIR --from-beginning --bootstrap-server localhost:9092
<record>
<HL7>
<Bundle xmlns="http://hl7.org/fhir">
<type value="transaction"></type>
<entry>
<fullUrl value="urn:uuid:5cbc121b-cd71-4428-b8b7-31e53eba8184"></fullUrl>
<resource>
<Patient xmlns="http://hl7.org/fhir">
<text>
<status value="generated"></status>
.
.
.
</amount>
</payment>
</ExplanationOfBenefit>
</resource>
<request>
<method value="POST"></method>
<url value="ExplanationOfBenefit"></url>
</request>
</entry>
</Bundle>
</HL7>
<MessageReceived>2021-01-13 21:44:41.0</MessageReceived>
<MessageDeliveredtoMariaDB>2021-01-13 21:44:33.0</MessageDeliveredtoMariaDB>
<MessageTransactionId>13fb854b-b6da-4bf9-9738-ba06ce41735a~8</MessageTransactionId>
<MessageId>1</MessageId>
<MessageInterfaceId>IB_Stage_Bundle</MessageInterfaceId>
<MessageStatus>PR</MessageStatus>
<MessageOrganizationId>HeC</MessageOrganizationId>
<MessageMimeType>XML</MessageMimeType>
<MessageStandardVersion>R4</MessageStandardVersion>
</record>
I think that, as soon as this command-line utility is run, a flowfile appears in the success queue between PublishKafka_6_2 and the NoOp processor, but maybe it was there already as soon as I started PublishKafka_2_6.
+------------------+ +------+ | ConsumeKafka_2_6 | -----→ | NoOp | +------------------+ +------+
Let the learning commence!
Let's check out How to Read and Write to Apache Kafka with Apache NiFi to see if it can tell us what we did wrong.