Apache Kafka Integration With Storm
1. Introduction
This is an in-depth article related to the Apache Kafka producer example. Apache Kafka is an Apache open-source project. It was initially created on Linkedin. Kafka framework was created in java and scala. It supports publish-subscribe messaging and is fault-tolerant. It is scalable and performs for high-volume messaging. Zookeeper is the basic component that manages the Apache Kafka Server. Kafka has features related to reliability, scalability, performance, distributed logging, and durability.
2. Apache Kafka – Storm Integration
Apache Storm was developed by the BackType team led by Nathan Marz. The storm is used for real-time data processing. It can perform at speeds of 1 million tuples per second on a single node. Spouts are the data sources and bolts ae the data processing pipelines. The topology consists of Bolts and Spouts. Data is streamed using the storm topologies using Kafka for publishing and subscribe messaging.
2.1 Prerequisites
Java 7 or 8 is required on the Linux, windows, or Mac operating system. Apache Storm 0.95 and Kafka 2.11-0.9.0.0 are needed for this article.
2.2 Download
You can download Java 8 can be downloaded from the Oracle website. Kafka framework’s latest releases are available from this website. Apache storm can be downloaded from this site.
2.3 Setup
You can set the environment variables for JAVA_HOME and PATH. They can be set as shown below:
Setup
JAVA_HOME="/desktop/jdk1.8.0_73" export JAVA_HOME PATH=$JAVA_HOME/bin:$PATH export PATH
2.4 How to download and install Apache Kafka & Storm
Apache Kafka’s latest releases are available from the Apache Kafka website. After downloading the zip file can be extracted to a folder. Storm zip file can be extracted into a folder after downloading from the site.
To start the zookeeper, you can use the command below:
Zoo keeper
bin/zookeeper-server-start.sh config/zookeeper.properties
The output of the above command is shown as below:
Zoo keeper Output
apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/zookeeper-server-start.sh config/zookeeper.properties [2021-05-27 00:39:27,510] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,514] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2021-05-27 00:39:27,559] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,561] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2021-05-27 00:39:27,607] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.version=1.8.0_101 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.version=10.16 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.name=bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,610] INFO Server environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,632] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,676] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/zookeeper-server-start.sh config/zookeeper.properties [2021-05-27 00:39:27,510] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,514] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2021-05-27 00:39:27,559] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,561] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2021-05-27 00:39:27,607] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.version=1.8.0_101 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.version=10.16 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.name=bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,610] INFO Server environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO tickTime set to 3000 (orgapache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,632] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,676] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
You can now start the apache kafka server using the command below
Apache Kafka Server Run Command
bin/kafka-server-start.sh config/server.properties
The output of the above command is shown as below:
Apache Kafka Server Output
apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/kafka-server-start.sh config/server.properties [2021-05-27 00:42:40,482] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS default.replication.factor = 1 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 1000012 num.io.threads = 8 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = false quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = localhost:2181 authorizer.class.name = num.replica.fetchers = 1 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = false offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.9.0.X log.retention.hours = 168 num.partitions = 1 listeners = PLAINTEXT://0.0.0.0:9092 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 1048576 advertised.port = null log.cleaner.dedupe.buffer.size = 524288000 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = 6000 controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 1073741824 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = null group.max.session.timeout.ms = 30000 ssl.keystore.password = null zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /tmp/kafka-logs controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads = 3 ssl.key.password = null reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 102400 ssl.protocol = TLS socket.receive.buffer.bytes = 102400 ssl.keystore.location = null replica.fetch.min.bytes = 1 unclean.leader.election.enable = true group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 0 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = null leader.imbalance.per.broker.percentage = 10 (kafka.server.KafkaConfig) [2021-05-27 00:42:40,633] INFO starting (kafka.server.KafkaServer) [2021-05-27 00:42:40,641] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2021-05-27 00:42:40,660] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2021-05-27 00:42:40,672] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,672] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.version=1.8.0_101 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:os.version=10.16 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:user.name=bhagvan.kommadi (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,677] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@23f7d05d (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,698] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2021-05-27 00:42:40,705] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2021-05-27 00:42:40,827] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2021-05-27 00:42:40,928] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x179aa14e9770000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2021-05-27 00:42:40,929] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2021-05-27 00:42:41,229] INFO Loading logs. (kafka.log.LogManager) [2021-05-27 00:42:41,301] INFO Completed load of log my-topic-11 with log end offset 2 (kafka.log.Log) [2021-05-27 00:42:41,312] INFO Completed load of log kafkatopic-2 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,316] INFO Completed load of log kafkatopic-5 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,321] INFO Completed load of log my-topic-10 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,329] INFO Completed load of log kafka_topic_sentences-0 with log end offset 30 (kafka.log.Log) [2021-05-27 00:42:41,333] INFO Completed load of log kafkatopic-4 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,339] INFO Completed load of log kafkatopic-3 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,345] INFO Completed load of log my-topic-3 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,354] INFO Completed load of log my-topic-4 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,358] INFO Completed load of log kafkatopic-11 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,363] INFO Completed load of log my-topic-5 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,367] INFO Completed load of log my-topic-2 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,375] INFO Completed load of log my-first-topic-0 with log end offset 1006174 (kafka.log.Log) [2021-05-27 00:42:41,380] INFO Completed load of log kafkatopic-10 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,391] INFO Completed load of log kafka_topic_phrases-0 with log end offset 20 (kafka.log.Log) [2021-05-27 00:42:41,396] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,404] INFO Completed load of log newtopic-0 with log end offset 14 (kafka.log.Log) [2021-05-27 00:42:41,408] INFO Completed load of log kafkatopic-6 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,416] INFO Completed load of log kafkatopic-1 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,420] INFO Completed load of log kafkatopic-8 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,424] INFO Completed load of log my-topic-12 with log end offset 2 (kafka.log.Log) [2021-05-27 00:42:41,428] INFO Completed load of log kafkatopic-9 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,432] INFO Completed load of log kafkatopic-0 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,438] INFO Completed load of log kafkatopic-7 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,441] INFO Completed load of log kafkatopic-12 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,445] INFO Completed load of log my-topic-7 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,449] INFO Completed load of log my-topic-0 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,456] INFO Completed load of log my-topic-9 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,463] INFO Completed load of log my-topic-8 with log end offset 2 (kafka.log.Log) [2021-05-27 00:42:41,468] INFO Completed load of log my-topic-1 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,473] INFO Completed load of log my-topic-6 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,477] INFO Completed load of log ExampleTopic-0 with log end offset 10 (kafka.log.Log) [2021-05-27 00:42:41,482] INFO Logs loading complete. (kafka.log.LogManager) [2021-05-27 00:42:41,483] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2021-05-27 00:42:41,512] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2021-05-27 00:42:48,658] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2021-05-27 00:42:48,663] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2021-05-27 00:42:48,705] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:48,705] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:48,786] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:48,801] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:48,801] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2021-05-27 00:42:49,494] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2021-05-27 00:42:49,511] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator) [2021-05-27 00:42:49,513] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:49,516] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator) [2021-05-27 00:42:49,517] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:49,527] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager) [2021-05-27 00:42:49,562] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2021-05-27 00:42:49,563] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2021-05-27 00:42:49,571] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2021-05-27 00:42:49,589] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:49,594] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:49,597] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(0.0.0.0,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2021-05-27 00:42:49,613] INFO Kafka version : 0.9.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-05-27 00:42:49,613] INFO Kafka commitId : fc7243c2af4b2b4a (org.apache.kafka.common.utils.AppInfoParser) [2021-05-27 00:42:49,614] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2021-05-27 00:42:49,986] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-topic,5],[test,0],[kafkatopic,11],[my-topic,0],[my-topic,7],[kafkatopic,3],[kafkatopic,1],[kafkatopic,6],[kafkatopic,12],[kafkatopic,7],[kafkatopic,4],[kafkatopic,2],[kafka_topic_phrases,0],[kafkatopic,8],[kafka_topic_sentences,0],[my-topic,4],[kafkatopic,9],[my-topic,12],[my-topic,6],[my-topic,11],[my-topic,10],[my-topic,9],[my-topic,2],[my-first-topic,0],[ExampleTopic,0],[kafkatopic,5],[kafkatopic,0],[my-topic,3],[newtopic,0],[my-topic,1],[kafkatopic,10],[my-topic,8] (kafka.server.ReplicaFetcherManager) [2021-05-27 00:42:50,209] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-topic,5],[test,0],[kafkatopic,11],[my-topic,0],[my-topic,7],[kafkatopic,3],[kafkatopic,1],[kafkatopic,6],[kafkatopic,12],[kafkatopic,7],[kafkatopic,4],[kafkatopic,2],[kafka_topic_phrases,0],[kafkatopic,8],[kafka_topic_sentences,0],[my-topic,4],[kafkatopic,9],[my-topic,12],[my-topic,6],[my-topic,11],[my-topic,10],[my-topic,9],[my-topic,2],[my-first-topic,0],[ExampleTopic,0],[kafkatopic,5],[kafkatopic,0],[my-topic,3],[newtopic,0],[my-topic,1],[kafkatopic,10],[my-topic,8] (kafka.server.ReplicaFetcherManager)
2.5 Apache Kafka – Storm Api
In Storm, spout and bolts are used for data sources and pipelines respectively. Messages sent to a Kafka topic are consumed by the spout. Bolt takes the data as a stream from Spout. You can have different operations such as executing functions, tuple filters, aggregation of data as a stream, joins, and retrieving from data sources. The topology consists of spouts and bolts. Integration of Kafka and Storm is based on BrokerHosts, Kafka Config,SchemeAsMultiScheme, and SpoutConfig API. ZKHosts and StaticHosts are implementations of BrokerHosts Interface. ZKHosts track Kafka brokers using Zookeeper for storing the details. StaticHosts help in setting the Kafka Brokers and the details. ZkHosts is a better way as it is performant compared to StaticHosts. You can configure the Kafka cluster using KafkaConfig
. SpoutConfig
is used for storing the Zookeeper infomation. SchemeAsMultiScheme
helps in transforming the data (ByteBuffer form) into Storm tuples. KafkaSpout
helps in storm integration with Kafka.
2.6 Apache Kafka -Storm Integration Example
In this example, you can see the topology setup with spout and bolt. Let us look at the topology class first.
Apache Kafka Storm Integration – Topology
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import java.util.ArrayList; import java.util.List; import java.util.UUID; import backtype.storm.spout.SchemeAsMultiScheme; import storm.kafka.trident.GlobalPartitionInformation; import storm.kafka.ZkHosts; import storm.kafka.Broker; import storm.kafka.StaticHosts; import storm.kafka.BrokerHosts; import storm.kafka.SpoutConfig; import storm.kafka.KafkaConfig; import storm.kafka.KafkaSpout; import storm.kafka.StringScheme; public class KafkaStormIntegrationExample { public static void main(String[] args) throws Exception{ Config stormConfig = new Config(); stormConfig.setDebug(true); stormConfig.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); String zkConnString = "localhost:2181"; String topic = "kafka_topic_phrases"; BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfiguration = new SpoutConfig (hosts, topic, "/" + topic, UUID.randomUUID().toString()); spoutConfiguration.bufferSizeBytes = 1024 * 1024 * 4; spoutConfiguration.fetchSizeBytes = 1024 * 1024 * 4; spoutConfiguration.forceFromStart = true; spoutConfiguration.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("kafka-spout", new KafkaSpout(spoutConfiguration)); topologyBuilder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout"); topologyBuilder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter"); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("KafkaStormSample", stormConfig, topologyBuilder.createTopology()); Thread.sleep(10000); localCluster.shutdown(); } }
The code below shows the PhraseSplitBolt
which splits the phrases into words.
Apache Kafka Storm Integration – PhraseSplitBolt
import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class PhraseSplitBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word: words) { word = word.trim(); if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void cleanup() {} @Override public Map getComponentConfiguration() { return null; } }
Now let us look at WordCountBolt
class.
Apache Kafka Storm Integration – WordCountBolt
import java.util.Map; import java.util.HashMap; import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class WordCountBolt implements IRichBolt{ Map counters; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap(); this.collector = collector; } @Override public void execute(Tuple input) { String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); } collector.ack(input); } @Override public void cleanup() { for(Map.Entry entry:counters.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map getComponentConfiguration() { return null; } }
On kafka front, You can have a Simple producer sending messages. The source code of the SimpleProducer
class is shown below.
Apache Kafka Storm Integration – WordCountBolt
import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class SimpleProducerExample { public static void main(String[] args) throws Exception{ if(args.length == 0){ System.out.println("Enter the topic to be created "); return; } String exampleTopicName = args[0].toString(); Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer (properties); String[] strings = new String[10]; strings[0] = "hi"; strings[1] = "kafka test"; strings[2] = "storm check"; strings[3] = "spark job"; strings[4] = "message"; strings[5] = "operator"; strings[6] = "modulo"; strings[7] = "remainder"; strings[8] = "backtype"; strings[9] = "utility"; for(int i = 0; i < 10; i++) { producer.send(new ProducerRecord(exampleTopicName,strings[i])); System.out.println("Message sent successfully "+ strings[i]); Thread.sleep(100); } producer.close(); } }
You can compile the code using the command below
Kafka Simple Producer Compilation Command
javac -cp "$KAFKA_HOME/libs/*" SimpleProducerExample.java
First, let us start with the zookeeper and Kafka server. You can compile all the storm classes using the command below. Ensure that the log4j-over-slf4j-1.6.6.jar is not part of the classpath in the command below. curator-client-2.9.1.jar,curator-framework-2.9.1, and guava-11.0.2.jar can be included in the classpath.
Storm side code -Compilation
apples-MacBook-Air:apache_stom bhagvan.kommadi$ javac -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." *.java
Then you can execute the storm side code using the command below
Storm side code – Execution
apples-MacBook-Air:apache_stom bhagvan.kommadi$ javac -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." *.java
Now you can start sending the messages to the kafka topic after creating the topic.
Kafka Simple Producer Compilation Command
java -cp "$KAFKA_HOME/libs/*" SimpleProducerExample kafka_topic_phrases
The output of the command executed above is shown below:
Apache Kafka Server Output
apples-MacBook-Air:apachekafkaproducer bhagvan.kommadi$ java -cp "$KAFKA_HOME/libs/*:." SimpleProducerExample kafka_topic_phrases Message sent successfully hi Message sent successfully kafka test Message sent successfully storm check Message sent successfully spark job Message sent successfully message Message sent successfully operator Message sent successfully modulo Message sent successfully remainder Message sent successfully backtype Message sent successfully utility
On the storm side, the output of the executed command is shown below.
Apache Kafka Server Output
apples-MacBook-Air:apache_stom bhagvan.kommadi$ java -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." KafkaStormSample SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 3345 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:host.name=localhost 3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_101 3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation 3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre 3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.class.path=/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/storm-core-0.9.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-stacktrace-0.2.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/disruptor-2.10.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/minlog-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/compojure-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/reflectasm-1.07-shaded.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-core-1.1.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clout-1.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-logging-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/objenesis-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-jetty-adapter-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/slf4j-api-1.7.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jgrapht-core-0.9.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.macro-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/hiccup-0.3.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/servlet-api-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/json-simple-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/math.numeric-tower-0.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-util-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/carbonite-1.4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-core-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/chill-java-0.3.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/joda-time-2.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.cli-0.2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-io-2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-codec-1.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.logging-0.2.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-servlet-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/snakeyaml-1.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-exec-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/asm-4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-lang-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/core.incubator-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jline-2.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/kryo-2.21.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-fileupload-1.2.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-devel-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-time-0.4.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clojure-1.5.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-jcl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-1.2-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-core-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-client-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-framework-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/guava-11.0.2.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/kafka-clients-0.9.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/storm-kafka-0.9.5.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-http-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-log4j-appender-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-client-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-core-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zkclient-0.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-databind-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-media-jaxb-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-api-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-tools-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-guava-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/argparse4j-0.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-scaladoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-security-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-xml_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-parser-combinators_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-servlet-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-api-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.servlet-api-3.1.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-json-provider-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/log4j-1.2.17.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-module-jaxb-annotations-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-api-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/validation-api-1.1.0.Final.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-sources.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javassist-3.18.1-GA.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-test.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.annotation-api-1.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-library-2.11.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-json-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/metrics-core-2.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-clients-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/lz4-1.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-locator-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-io-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/osgi-resource-locator-1.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/aopalliance-repackaged-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jopt-simple-3.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/snappy-java-1.1.1.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-annotations-2.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zookeeper-3.4.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-util-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-server-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-file-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-runtime-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.ws.rs-api-2.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-utils-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-core-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-server-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-common-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-base-2.5.4.jar:. 3372 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. 3372 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.compiler= 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:os.arch=x86_64 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:os.version=10.16 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:user.name=bhagvan.kommadi 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:user.home=/Users/bhagvan.kommadi 3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:user.dir=/Users/bhagvan.kommadi/Downloads/apache_stom 3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:host.name=localhost 3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.8.0_101 3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation 3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre 3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.class.path=/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/storm-core-0.9.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-stacktrace-0.2.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/disruptor-2.10.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/minlog-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/compojure-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/reflectasm-1.07-shaded.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-core-1.1.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clout-1.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-logging-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/objenesis-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-jetty-adapter-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/slf4j-api-1.7.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jgrapht-core-0.9.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.macro-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/hiccup-0.3.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/servlet-api-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/json-simple-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/math.numeric-tower-0.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-util-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/carbonite-1.4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-core-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/chill-java-0.3.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/joda-time-2.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.cli-0.2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-io-2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-codec-1.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.logging-0.2.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-servlet-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/snakeyaml-1.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-exec-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/asm-4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-lang-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/core.incubator-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jline-2.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/kryo-2.21.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-fileupload-1.2.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-devel-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-time-0.4.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clojure-1.5.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-jcl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-1.2-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-core-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-client-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-framework-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/guava-11.0.2.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/kafka-clients-0.9.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/storm-kafka-0.9.5.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-http-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-log4j-appender-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-client-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-core-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zkclient-0.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-databind-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-media-jaxb-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-api-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-tools-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-guava-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/argparse4j-0.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-scaladoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-security-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-xml_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-parser-combinators_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-servlet-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-api-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.servlet-api-3.1.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-json-provider-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/log4j-1.2.17.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-module-jaxb-annotations-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-api-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/validation-api-1.1.0.Final.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-sources.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javassist-3.18.1-GA.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-test.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.annotation-api-1.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-library-2.11.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-json-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/metrics-core-2.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-clients-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/lz4-1.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-locator-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-io-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/osgi-resource-locator-1.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/aopalliance-repackaged-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jopt-simple-3.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/snappy-java-1.1.1.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-annotations-2.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zookeeper-3.4.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-util-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-server-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-file-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-runtime-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.ws.rs-api-2.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-utils-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-core-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-server-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-common-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-base-2.5.4.jar:. 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.compiler= 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.name=Mac OS X 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.arch=x86_64 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.version=10.16 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.name=bhagvan.kommadi 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.home=/Users/bhagvan.kommadi 3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.dir=/Users/bhagvan.kommadi/Downloads/apache_stom 4499 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/a38c5a4c-6ac0-423b-913e-097a53145a3f/version-2 snapdir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/a38c5a4c-6ac0-423b-913e-097a53145a3f/version-2 4532 [main] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000 4541 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//a38c5a4c-6ac0-423b-913e-097a53145a3f 4817 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil} 4836 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler 4863 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 4959 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 4962 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@2416498e 4998 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error) 5096 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session 5096 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50490 5111 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50490 5114 [SyncThread:0] INFO org.apache.storm.zookeeper.server.persistence.FileTxnLog - Creating new log file: log.1 5126 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700000 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50490 5127 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700000, negotiated timeout = 20000 5130 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 5134 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none 6193 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700000 6194 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700000 closed 6194 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 6196 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50490 which had sessionid 0x179a924e7700000 6197 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 6197 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 6198 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@43f0c2d1 6199 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 6200 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session 6200 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50491 6200 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50491 6202 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700001 with negotiated timeout 20000 for client /127.0.0.1:50491 6202 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700001, negotiated timeout = 20000 6202 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 6239 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 6240 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 6240 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@23592946 6242 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 6243 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session 6243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50492 6243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50492 6245 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700002 with negotiated timeout 20000 for client /127.0.0.1:50492 6245 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700002, negotiated timeout = 20000 6245 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 6246 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none 7255 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700002 7257 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700002 closed 7257 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7257 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 7258 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50492 which had sessionid 0x179a924e7700002 7258 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7258 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5c60b0a0 7260 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7261 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error) 7261 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7261 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session 7261 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50493 7262 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50493 7262 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@23f3dbf0 7264 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error) 7264 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700003 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50493 7264 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700003, negotiated timeout = 20000 7264 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7264 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session 7264 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50494 7265 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50494 7266 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700004 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50494 7266 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700004, negotiated timeout = 20000 7267 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7267 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none 7269 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700004 7270 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700004 closed 7284 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7284 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 7284 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7286 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@31ff6309 7286 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50494 which had sessionid 0x179a924e7700004 7289 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 7290 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session 7290 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50495 7291 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50495 7296 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700005 with negotiated timeout 20000 for client /127.0.0.1:50495 7296 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700005, negotiated timeout = 20000 7297 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7318 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//4a2ac468-1fa6-4abd-9863-0b9babd83b88", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil} 7345 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7345 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7346 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@46731692 7348 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error) 7349 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session 7349 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50496 7349 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50496 7351 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700006 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50496 7351 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700006, negotiated timeout = 20000 7354 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7354 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none 7359 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700006 7360 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700006 closed 7360 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 7360 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7361 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7361 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50496 which had sessionid 0x179a924e7700006 7362 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@ad9e63e 7364 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 7365 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session 7365 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50497 7365 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50497 7367 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700007 with negotiated timeout 20000 for client /127.0.0.1:50497 7367 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700007, negotiated timeout = 20000 7367 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7404 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id bb3e0fa6-6811-4f10-ba4e-34f018909fa6 at host localhost 7407 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil} 7410 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7411 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7411 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@b0fc838 7413 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error) 7413 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session 7413 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50498 7414 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50498 7415 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700008 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50498 7415 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700008, negotiated timeout = 20000 7415 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7415 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none 7418 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700008 7419 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700008 closed 7420 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 7420 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50498 which had sessionid 0x179a924e7700008 7420 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 7422 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 7422 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5fbdc49b 7424 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 7425 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session 7425 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50499 7425 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50499 7426 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700009 with negotiated timeout 20000 for client /127.0.0.1:50499 7427 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700009, negotiated timeout = 20000 7427 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 7436 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 at host localhost 7508 [main] INFO backtype.storm.daemon.nimbus - Received topology submission for KafkaStormSample with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "KafkaStormSample", "storm.id" "KafkaStormSample-1-1622040441", "topology.debug" true, "topology.max.spout.pending" 1} 7540 [main] INFO backtype.storm.daemon.nimbus - Activating KafkaStormSample: KafkaStormSample-1-1622040441 7650 [main] INFO backtype.storm.scheduler.EvenScheduler - Available slots: (["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1028] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1029] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1024] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1025] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1026]) 7680 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment for topology id KafkaStormSample-1-1622040441: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d/nimbus/stormdist/KafkaStormSample-1-1622040441", :node->host {"563a836c-55ba-4b7e-9f4c-beaf44aea5a3" "localhost"}, :executor->node+port {[4 4] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [3 3] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [2 2] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [1 1] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027]}, :executor->start-time-secs {[4 4] 1622040441, [3 3] 1622040441, [2 2] 1622040441, [1 1] 1622040441}} 8921 [Thread-5] INFO backtype.storm.daemon.supervisor - Extracting resources from jar at /users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar to /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df/supervisor/stormdist/KafkaStormSample-1-1622040441/resources 8962 [Thread-6] INFO backtype.storm.daemon.supervisor - Launching worker with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id "KafkaStormSample-1-1622040441", :executors ([4 4] [3 3] [2 2] [1 1])} for this supervisor 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 on port 1027 with id 39344513-9c18-41e7-bf9b-fd894180922f 8969 [Thread-6] INFO backtype.storm.daemon.worker - Launching worker for KafkaStormSample-1-1622040441 on 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 with id 39344513-9c18-41e7-bf9b-fd894180922f and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil} 8970 [Thread-6] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 8970 [Thread-6] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 8972 [Thread-6] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@559169a4 8978 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error) 8979 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session 8979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50500 8979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50500 8981 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000a with negotiated timeout 20000 for client /127.0.0.1:50500 8981 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e770000a, negotiated timeout = 20000 8982 [Thread-6-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 8982 [Thread-6-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none 8985 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000a 8987 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50500 which had sessionid 0x179a924e770000a 8987 [Thread-6] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e770000a closed 8987 [Thread-6-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 8987 [Thread-6] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5] 8989 [Thread-6] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting 8989 [Thread-6] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@2a7cbfce 8992 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error) 8992 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session 8992 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50501 8993 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50501 8994 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e770000b, negotiated timeout = 20000 8994 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000b with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50501 8995 [Thread-6-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED 9004 [Thread-6] INFO backtype.storm.daemon.worker - Reading Assignments. 9074 [Thread-6] INFO backtype.storm.daemon.worker - Launching receive-thread for 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 9083 [Thread-7-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Starting receive-thread: [stormId: KafkaStormSample-1-1622040441, port: 1027, thread-id: 0 ] 9410 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor kafka-spout:[2 2] 9429 [Thread-6] INFO backtype.storm.daemon.task - Emitting: kafka-spout __system ["startup"] 9429 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks kafka-spout:[2 2] 9443 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor kafka-spout:[2 2] 9459 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor word-counter:[3 3] 9460 [Thread-6] INFO backtype.storm.daemon.task - Emitting: word-counter __system ["startup"] 9461 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks word-counter:[3 3] 9469 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor word-counter:[3 3] 9479 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor word-spitter:[4 4] 9480 [Thread-6] INFO backtype.storm.daemon.task - Emitting: word-spitter __system ["startup"] 9481 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks word-spitter:[4 4] 9483 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor word-spitter:[4 4] 9491 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor __system:[-1 -1] 9492 [Thread-6] INFO backtype.storm.daemon.task - Emitting: __system __system ["startup"] 9492 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks __system:[-1 -1] 9495 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor __system:[-1 -1] 9502 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor __acker:[1 1] 9506 [Thread-6] INFO backtype.storm.daemon.task - Emitting: __acker __system ["startup"] 9506 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks __acker:[1 1] 9509 [Thread-6] INFO backtype.storm.daemon.executor - Timeouts disabled for executor __acker:[1 1] 9509 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor __acker:[1 1] 9518 [Thread-6] INFO backtype.storm.daemon.worker - Worker has topology config {"storm.id" "KafkaStormSample-1-1622040441", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "topology.name" "KafkaStormSample", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" 1, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" true, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil} 9520 [Thread-6] INFO backtype.storm.daemon.worker - Worker 39344513-9c18-41e7-bf9b-fd894180922f for storm KafkaStormSample-1-1622040441 on 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 has finished loading 10039 [refresh-active-timer] INFO backtype.storm.daemon.worker - All connections are ready for worker 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 with id 39344513-9c18-41e7-bf9b-fd894180922f 10062 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Opening spout kafka-spout:(2) 10087 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Preparing bolt word-counter:(3) 10095 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Prepared bolt word-counter:(3) 10107 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Preparing bolt word-spitter:(4) 10108 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Prepared bolt word-spitter:(4) 10113 [Thread-15-__system] INFO backtype.storm.daemon.executor - Preparing bolt __system:(-1) 10116 [Thread-15-__system] INFO backtype.storm.daemon.executor - Prepared bolt __system:(-1) 10132 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Preparing bolt __acker:(1) 10135 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Prepared bolt __acker:(1) 10197 [Thread-9-kafka-spout] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting 10220 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50502 10222 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50502 10223 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000c with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50502 10230 [Thread-9-kafka-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED 10232 [Thread-9-kafka-spout] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting 10245 [Thread-9-kafka-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED 10283 [Thread-9-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=0.0.0.0:9092}} 10284 [Thread-9-kafka-spout] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting 10288 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Opened spout kafka-spout:(2) 10293 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Activating spout kafka-spout:(2) 10293 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections 10297 [Thread-9-kafka-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED 10301 [Thread-9-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=0.0.0.0:9092}} 10303 [Thread-9-kafka-spout] INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=0.0.0.0:9092, partition=0}] 10303 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] 10303 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=0.0.0.0:9092, partition=0}] 10526 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Read partition information from: /kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db/partition_0 --> null ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. 10836 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset 10836 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 10838 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2 10839 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Starting Kafka 0.0.0.0:0 from offset 0 10843 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing 10943 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [hi] 10945 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {-9145905182196884541=-200665344721218530}, [hi] 10946 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [hi] 10946 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [hi] 10946 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [-9145905182196884541 -200665344721218530] 10946 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [-9145905182196884541 -200665344721218530] 10948 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [-9145905182196884541 -200665344721218530 2] 10950 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [-9145905182196884541 -200665344721218530 2] 10951 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [-9145905182196884541] 10954 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-9145905182196884541] 10955 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@749098b6 10955 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [kafka test] 10956 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [3361071049515033607 -3553068181479967910 2] 10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {3361071049515033607=-3553068181479967910}, [kafka test] 10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [kafka] 10956 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [3361071049515033607 -3553068181479967910 2] 10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [test] 10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [3361071049515033607 -3553068181479967910] 10956 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [kafka] 10956 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [test] 10958 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [3361071049515033607 -3553068181479967910] 10958 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [3361071049515033607] 10958 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [3361071049515033607] 10959 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@2c88dd7e 10959 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [storm check] 10959 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [5656882679628994049 -2821302035063205381 2] 10959 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {5656882679628994049=-2821302035063205381}, [storm check] 10960 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [storm] 10960 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [check] 10960 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [5656882679628994049 -2821302035063205381] 10960 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [storm] 10960 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [check] 10960 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [5656882679628994049 -2821302035063205381] 10961 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [5656882679628994049 -2821302035063205381 2] 10961 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [5656882679628994049] 10962 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [5656882679628994049] 10963 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@55ffcecb 10963 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [spark job] 10963 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {7133074069827401185=7517764075602487290}, [spark job] 10963 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [spark] 10963 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [job] 10964 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [spark] 10964 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [7133074069827401185 7517764075602487290] 10964 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [job] 10964 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [7133074069827401185 7517764075602487290] 10964 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [7133074069827401185 7517764075602487290 2] 10964 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [7133074069827401185 7517764075602487290 2] 10965 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [7133074069827401185] 10966 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [7133074069827401185] 10967 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@79baf4fe 10967 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [message] 10969 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {6343398862224861484=-4414870605500453828}, [message] 10969 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [6343398862224861484 -4414870605500453828 2] 10970 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [message] 10970 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [6343398862224861484 -4414870605500453828] 10970 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [6343398862224861484 -4414870605500453828 2] 10970 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [message] 10972 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [6343398862224861484 -4414870605500453828] 10972 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [6343398862224861484] 10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [6343398862224861484] 10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@135f554d 10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [operator] 10974 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {-3129322591382918852=-740436277297000921}, [operator] 10974 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [operator] 10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [-3129322591382918852 -740436277297000921 2] 10975 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [-3129322591382918852 -740436277297000921] 10975 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [operator] 10975 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [-3129322591382918852 -740436277297000921] 10975 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [-3129322591382918852 -740436277297000921 2] 10975 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [-3129322591382918852] 10977 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-3129322591382918852] 10977 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@46556960 10977 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [modulo] 10978 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {3640793944618690795=8106108915378369655}, [modulo] 10978 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [modulo] 10978 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [3640793944618690795 8106108915378369655] 10978 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [modulo] 10978 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [3640793944618690795 8106108915378369655] 10979 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [3640793944618690795 8106108915378369655 2] 10979 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [3640793944618690795 8106108915378369655 2] 10979 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [3640793944618690795] 10979 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [3640793944618690795] 10979 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@28df1f28 10980 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [remainder] 10980 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2596062899289317771=2634515203207894354}, [remainder] 10980 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [remainder] 10980 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2596062899289317771 2634515203207894354] 10980 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [remainder] 10981 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2596062899289317771 2634515203207894354] 10981 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2596062899289317771 2634515203207894354 2] 10981 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2596062899289317771 2634515203207894354 2] 10981 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2596062899289317771] 10983 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2596062899289317771] 10983 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@70d7b03d 10983 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [backtype] 10984 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2514155816374762264 -1773960649746285671 2] 10984 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2514155816374762264=-1773960649746285671}, [backtype] 10984 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [backtype] 10984 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2514155816374762264 -1773960649746285671] 10984 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2514155816374762264 -1773960649746285671 2] 10984 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [backtype] 10984 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2514155816374762264 -1773960649746285671] 10984 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2514155816374762264] 10986 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2514155816374762264] 10986 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@194b6657 10986 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [utility] 10987 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2895993183130256304=2076802810368494976}, [utility] 10987 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [utility] 10987 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2895993183130256304 2076802810368494976] 10987 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [utility] 10987 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2895993183130256304 2076802810368494976] 10989 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2895993183130256304 2076802810368494976 2] 10990 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2895993183130256304 2076802810368494976 2] 10990 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2895993183130256304] 10991 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2895993183130256304] 10992 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@74380277 12988 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x179a924e770000c type:create cxid:0x3 zxid:0x22 txntype:-1 reqpath:n/a Error Path:/kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db Error:KeeperErrorCode = NoNode for /kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db 17697 [main] INFO backtype.storm.daemon.nimbus - Shutting down master 17699 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700001 17700 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700001 closed 17700 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 17700 [main] INFO backtype.storm.daemon.nimbus - Shut down master 17700 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50491 which had sessionid 0x179a924e7700001 17703 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700003 17704 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50493 which had sessionid 0x179a924e7700003 17704 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700003 closed 17705 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 17706 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700005 17707 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700005 closed 17707 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50495 which had sessionid 0x179a924e7700005 17707 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 17708 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor bb3e0fa6-6811-4f10-ba4e-34f018909fa6 17709 [Thread-3] INFO backtype.storm.event - Event manager interrupted 17710 [Thread-4] INFO backtype.storm.event - Event manager interrupted 17712 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700007 17712 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700007 closed 17713 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 17713 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50497 which had sessionid 0x179a924e7700007 17714 [main] INFO backtype.storm.daemon.supervisor - Shutting down 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:39344513-9c18-41e7-bf9b-fd894180922f 17714 [main] INFO backtype.storm.process-simulator - Killing process 9e419fa3-7aeb-4b37-982a-7ff15ea211fb 17715 [main] INFO backtype.storm.daemon.worker - Shutting down worker KafkaStormSample-1-1622040441 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 1027 17715 [main] INFO backtype.storm.daemon.worker - Shutting down receive thread 17715 [main] INFO backtype.storm.messaging.loader - Shutting down receiving-thread: [KafkaStormSample-1-1622040441, 1027] 17715 [main] INFO backtype.storm.messaging.loader - Waiting for receiving-thread:[KafkaStormSample-1-1622040441, 1027] to die 17716 [Thread-7-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Receiving-thread:[KafkaStormSample-1-1622040441, 1027] received shutdown notice 17717 [main] INFO backtype.storm.messaging.loader - Shutdown receiving-thread: [KafkaStormSample-1-1622040441, 1027] 17717 [main] INFO backtype.storm.daemon.worker - Shut down receive thread 17717 [main] INFO backtype.storm.daemon.worker - Terminating messaging context 17717 [main] INFO backtype.storm.daemon.worker - Shutting down executors 17718 [main] INFO backtype.storm.daemon.executor - Shutting down executor kafka-spout:[2 2] 17719 [Thread-9-kafka-spout] INFO backtype.storm.util - Async loop interrupted! 17719 [Thread-8-disruptor-executor[2 2]-send-queue] INFO backtype.storm.util - Async loop interrupted! 17725 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000c 17726 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50502 which had sessionid 0x179a924e770000c 17727 [main] INFO backtype.storm.daemon.executor - Shut down executor kafka-spout:[2 2] 17727 [main] INFO backtype.storm.daemon.executor - Shutting down executor word-counter:[3 3] 17728 [Thread-11-word-counter] INFO backtype.storm.util - Async loop interrupted! 17728 [Thread-10-disruptor-executor[3 3]-send-queue] INFO backtype.storm.util - Async loop interrupted! hi : 1 storm : 1 test : 1 utility : 1 check : 1 message : 1 operator : 1 spark : 1 kafka : 1 backtype : 1 job : 1 modulo : 1 remainder : 1 17729 [main] INFO backtype.storm.daemon.executor - Shut down executor word-counter:[3 3] 17729 [main] INFO backtype.storm.daemon.executor - Shutting down executor word-spitter:[4 4] 17730 [Thread-13-word-spitter] INFO backtype.storm.util - Async loop interrupted! 17730 [Thread-12-disruptor-executor[4 4]-send-queue] INFO backtype.storm.util - Async loop interrupted! 17731 [main] INFO backtype.storm.daemon.executor - Shut down executor word-spitter:[4 4] 17732 [main] INFO backtype.storm.daemon.executor - Shutting down executor __system:[-1 -1] 17733 [Thread-15-__system] INFO backtype.storm.util - Async loop interrupted! 17734 [Thread-14-disruptor-executor[-1 -1]-send-queue] INFO backtype.storm.util - Async loop interrupted! 17736 [main] INFO backtype.storm.daemon.executor - Shut down executor __system:[-1 -1] 17736 [main] INFO backtype.storm.daemon.executor - Shutting down executor __acker:[1 1] 17736 [Thread-17-__acker] INFO backtype.storm.util - Async loop interrupted! 17737 [Thread-16-disruptor-executor[1 1]-send-queue] INFO backtype.storm.util - Async loop interrupted! 17737 [main] INFO backtype.storm.daemon.executor - Shut down executor __acker:[1 1] 17737 [main] INFO backtype.storm.daemon.worker - Shut down executors 17738 [main] INFO backtype.storm.daemon.worker - Shutting down transfer thread 17738 [Thread-18-disruptor-worker-transfer-queue] INFO backtype.storm.util - Async loop interrupted! 17740 [main] INFO backtype.storm.daemon.worker - Shut down transfer thread 17748 [main] INFO backtype.storm.daemon.worker - Shutting down default resources 17748 [main] INFO backtype.storm.daemon.worker - Shut down default resources 17762 [main] INFO backtype.storm.daemon.worker - Disconnecting from storm cluster state context 17763 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000b 17763 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50501 which had sessionid 0x179a924e770000b 17764 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e770000b closed 17764 [Thread-6-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 17764 [main] INFO backtype.storm.daemon.worker - Shut down worker KafkaStormSample-1-1622040441 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 1027 17772 [main] INFO backtype.storm.daemon.supervisor - Shut down 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:39344513-9c18-41e7-bf9b-fd894180922f 17773 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 17778 [Thread-5] INFO backtype.storm.event - Event manager interrupted 17779 [Thread-6] INFO backtype.storm.event - Event manager interrupted 17780 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700009 17780 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700009 closed 17781 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50499 which had sessionid 0x179a924e7700009 17781 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down 17781 [main] INFO backtype.storm.testing - Shutting down in process zookeeper 17782 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method 17782 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down 17782 [main] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down 17782 [main] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down 17782 [main] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down 17782 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop! 17782 [SyncThread:0] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited! 17783 [main] INFO org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete 17783 [main] INFO backtype.storm.testing - Done shutting down in process zookeeper 17783 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d 17789 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//a38c5a4c-6ac0-423b-913e-097a53145a3f 17791 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//4a2ac468-1fa6-4abd-9863-0b9babd83b88 17797 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df 17916 [SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
You can see the snippet from the output to zoom into the results of the spout and the bolts of storm for the data sent to kafka topic.
Output from the Storm side code
hi : 1 storm : 1 test : 1 utility : 1 check : 1 message : 1 operator : 1 spark : 1 kafka : 1 backtype : 1 job : 1 modulo : 1 remainder : 1
3. Download the Source Code
You can download the full source code of this example here: Apache Kafka Integration With Storm