Enterprise Java

Apache Kafka Integration With Storm

1. Introduction

This is an in-depth article related to the Apache Kafka producer example. Apache Kafka is an Apache open-source project. It was initially created on Linkedin. Kafka framework was created in java and scala. It supports publish-subscribe messaging and is fault-tolerant. It is scalable and performs for high-volume messaging. Zookeeper is the basic component that manages the Apache Kafka Server. Kafka has features related to reliability, scalability, performance, distributed logging, and durability.

2. Apache Kafka – Storm Integration

Apache Storm was developed by the BackType team led by Nathan Marz. The storm is used for real-time data processing. It can perform at speeds of 1 million tuples per second on a single node. Spouts are the data sources and bolts ae the data processing pipelines. The topology consists of Bolts and Spouts. Data is streamed using the storm topologies using Kafka for publishing and subscribe messaging.

2.1 Prerequisites

Java 7 or 8 is required on the Linux, windows, or Mac operating system. Apache Storm 0.95 and Kafka 2.11-0.9.0.0 are needed for this article.

2.2 Download

You can download Java 8 can be downloaded from the Oracle website.  Kafka framework’s latest releases are available from this website. Apache storm can be downloaded from this site.

2.3 Setup

You can set the environment variables for JAVA_HOME and PATH. They can be set as shown below:

Setup

JAVA_HOME="/desktop/jdk1.8.0_73"
export JAVA_HOME
PATH=$JAVA_HOME/bin:$PATH
export PATH

2.4 How to download and install Apache Kafka & Storm

Apache Kafka’s latest releases are available from the Apache Kafka website. After downloading the zip file can be extracted to a folder. Storm zip file can be extracted into a folder after downloading from the site.

To start the zookeeper, you can use the command below:

Zoo keeper

bin/zookeeper-server-start.sh config/zookeeper.properties

The output of the above command is shown as below:

Zoo keeper Output

apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2021-05-27 00:39:27,510] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-05-27 00:39:27,514] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-05-27 00:39:27,514] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-05-27 00:39:27,514] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-05-27 00:39:27,514] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2021-05-27 00:39:27,559] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-05-27 00:39:27,561] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2021-05-27 00:39:27,607] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.version=1.8.0_101 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:os.version=10.16 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:user.name=bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,610] INFO Server environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,631] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,631] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,632] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,676] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2021-05-27 00:39:27,510] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-05-27 00:39:27,514] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-05-27 00:39:27,514] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-05-27 00:39:27,514] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-05-27 00:39:27,514] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2021-05-27 00:39:27,559] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-05-27 00:39:27,561] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2021-05-27 00:39:27,607] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.version=1.8.0_101 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,608] INFO Server environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:os.version=10.16 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:user.name=bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,609] INFO Server environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,610] INFO Server environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,631] INFO tickTime set to 3000 (orgapache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,631] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,632] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-05-27 00:39:27,676] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

You can now start the apache kafka server using the command below

Apache Kafka Server Run Command

bin/kafka-server-start.sh config/server.properties

The output of the above command is shown as below:

Apache Kafka Server Output

apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/kafka-server-start.sh config/server.properties
[2021-05-27 00:42:40,482] INFO KafkaConfig values: 
	advertised.host.name = null
	metric.reporters = []
	quota.producer.default = 9223372036854775807
	offsets.topic.num.partitions = 50
	log.flush.interval.messages = 9223372036854775807
	auto.create.topics.enable = true
	controller.socket.timeout.ms = 30000
	log.flush.interval.ms = null
	principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
	replica.socket.receive.buffer.bytes = 65536
	min.insync.replicas = 1
	replica.fetch.wait.max.ms = 500
	num.recovery.threads.per.data.dir = 1
	ssl.keystore.type = JKS
	default.replication.factor = 1
	ssl.truststore.password = null
	log.preallocate = false
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	fetch.purgatory.purge.interval.requests = 1000
	ssl.endpoint.identification.algorithm = null
	replica.socket.timeout.ms = 30000
	message.max.bytes = 1000012
	num.io.threads = 8
	offsets.commit.required.acks = -1
	log.flush.offset.checkpoint.interval.ms = 60000
	delete.topic.enable = false
	quota.window.size.seconds = 1
	ssl.truststore.type = JKS
	offsets.commit.timeout.ms = 5000
	quota.window.num = 11
	zookeeper.connect = localhost:2181
	authorizer.class.name = 
	num.replica.fetchers = 1
	log.retention.ms = null
	log.roll.jitter.hours = 0
	log.cleaner.enable = false
	offsets.load.buffer.size = 5242880
	log.cleaner.delete.retention.ms = 86400000
	ssl.client.auth = none
	controlled.shutdown.max.retries = 3
	queued.max.requests = 500
	offsets.topic.replication.factor = 3
	log.cleaner.threads = 1
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	socket.request.max.bytes = 104857600
	ssl.trustmanager.algorithm = PKIX
	zookeeper.session.timeout.ms = 6000
	log.retention.bytes = -1
	sasl.kerberos.min.time.before.relogin = 60000
	zookeeper.set.acl = false
	connections.max.idle.ms = 600000
	offsets.retention.minutes = 1440
	replica.fetch.backoff.ms = 1000
	inter.broker.protocol.version = 0.9.0.X
	log.retention.hours = 168
	num.partitions = 1
	listeners = PLAINTEXT://0.0.0.0:9092
	ssl.provider = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	log.roll.ms = null
	log.flush.scheduler.interval.ms = 9223372036854775807
	ssl.cipher.suites = null
	log.index.size.max.bytes = 10485760
	ssl.keymanager.algorithm = SunX509
	security.inter.broker.protocol = PLAINTEXT
	replica.fetch.max.bytes = 1048576
	advertised.port = null
	log.cleaner.dedupe.buffer.size = 524288000
	replica.high.watermark.checkpoint.interval.ms = 5000
	log.cleaner.io.buffer.size = 524288
	sasl.kerberos.ticket.renew.window.factor = 0.8
	zookeeper.connection.timeout.ms = 6000
	controlled.shutdown.retry.backoff.ms = 5000
	log.roll.hours = 168
	log.cleanup.policy = delete
	host.name = 
	log.roll.jitter.ms = null
	max.connections.per.ip = 2147483647
	offsets.topic.segment.bytes = 104857600
	background.threads = 10
	quota.consumer.default = 9223372036854775807
	request.timeout.ms = 30000
	log.index.interval.bytes = 4096
	log.dir = /tmp/kafka-logs
	log.segment.bytes = 1073741824
	log.cleaner.backoff.ms = 15000
	offset.metadata.max.bytes = 4096
	ssl.truststore.location = null
	group.max.session.timeout.ms = 30000
	ssl.keystore.password = null
	zookeeper.sync.time.ms = 2000
	port = 9092
	log.retention.minutes = null
	log.segment.delete.delay.ms = 60000
	log.dirs = /tmp/kafka-logs
	controlled.shutdown.enable = true
	compression.type = producer
	max.connections.per.ip.overrides = 
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
	auto.leader.rebalance.enable = true
	leader.imbalance.check.interval.seconds = 300
	log.cleaner.min.cleanable.ratio = 0.5
	replica.lag.time.max.ms = 10000
	num.network.threads = 3
	ssl.key.password = null
	reserved.broker.max.id = 1000
	metrics.num.samples = 2
	socket.send.buffer.bytes = 102400
	ssl.protocol = TLS
	socket.receive.buffer.bytes = 102400
	ssl.keystore.location = null
	replica.fetch.min.bytes = 1
	unclean.leader.election.enable = true
	group.min.session.timeout.ms = 6000
	log.cleaner.io.buffer.load.factor = 0.9
	offsets.retention.check.interval.ms = 600000
	producer.purgatory.purge.interval.requests = 1000
	metrics.sample.window.ms = 30000
	broker.id = 0
	offsets.topic.compression.codec = 0
	log.retention.check.interval.ms = 300000
	advertised.listeners = null
	leader.imbalance.per.broker.percentage = 10
 (kafka.server.KafkaConfig)
[2021-05-27 00:42:40,633] INFO starting (kafka.server.KafkaServer)
[2021-05-27 00:42:40,641] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2021-05-27 00:42:40,660] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2021-05-27 00:42:40,672] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,672] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,675] INFO Client environment:java.version=1.8.0_101 (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,675] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,675] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,675] INFO Client environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:os.version=10.16 (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:user.name=bhagvan.kommadi (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,676] INFO Client environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,677] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@23f7d05d (org.apache.zookeeper.ZooKeeper)
[2021-05-27 00:42:40,698] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2021-05-27 00:42:40,705] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-05-27 00:42:40,827] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2021-05-27 00:42:40,928] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x179aa14e9770000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2021-05-27 00:42:40,929] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2021-05-27 00:42:41,229] INFO Loading logs. (kafka.log.LogManager)
[2021-05-27 00:42:41,301] INFO Completed load of log my-topic-11 with log end offset 2 (kafka.log.Log)
[2021-05-27 00:42:41,312] INFO Completed load of log kafkatopic-2 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,316] INFO Completed load of log kafkatopic-5 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,321] INFO Completed load of log my-topic-10 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,329] INFO Completed load of log kafka_topic_sentences-0 with log end offset 30 (kafka.log.Log)
[2021-05-27 00:42:41,333] INFO Completed load of log kafkatopic-4 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,339] INFO Completed load of log kafkatopic-3 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,345] INFO Completed load of log my-topic-3 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,354] INFO Completed load of log my-topic-4 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,358] INFO Completed load of log kafkatopic-11 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,363] INFO Completed load of log my-topic-5 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,367] INFO Completed load of log my-topic-2 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,375] INFO Completed load of log my-first-topic-0 with log end offset 1006174 (kafka.log.Log)
[2021-05-27 00:42:41,380] INFO Completed load of log kafkatopic-10 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,391] INFO Completed load of log kafka_topic_phrases-0 with log end offset 20 (kafka.log.Log)
[2021-05-27 00:42:41,396] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,404] INFO Completed load of log newtopic-0 with log end offset 14 (kafka.log.Log)
[2021-05-27 00:42:41,408] INFO Completed load of log kafkatopic-6 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,416] INFO Completed load of log kafkatopic-1 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,420] INFO Completed load of log kafkatopic-8 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,424] INFO Completed load of log my-topic-12 with log end offset 2 (kafka.log.Log)
[2021-05-27 00:42:41,428] INFO Completed load of log kafkatopic-9 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,432] INFO Completed load of log kafkatopic-0 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,438] INFO Completed load of log kafkatopic-7 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,441] INFO Completed load of log kafkatopic-12 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,445] INFO Completed load of log my-topic-7 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,449] INFO Completed load of log my-topic-0 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,456] INFO Completed load of log my-topic-9 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,463] INFO Completed load of log my-topic-8 with log end offset 2 (kafka.log.Log)
[2021-05-27 00:42:41,468] INFO Completed load of log my-topic-1 with log end offset 1 (kafka.log.Log)
[2021-05-27 00:42:41,473] INFO Completed load of log my-topic-6 with log end offset 0 (kafka.log.Log)
[2021-05-27 00:42:41,477] INFO Completed load of log ExampleTopic-0 with log end offset 10 (kafka.log.Log)
[2021-05-27 00:42:41,482] INFO Logs loading complete. (kafka.log.LogManager)
[2021-05-27 00:42:41,483] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2021-05-27 00:42:41,512] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2021-05-27 00:42:48,658] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2021-05-27 00:42:48,663] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2021-05-27 00:42:48,705] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-05-27 00:42:48,705] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-05-27 00:42:48,786] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2021-05-27 00:42:48,801] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2021-05-27 00:42:48,801] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2021-05-27 00:42:49,494] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2021-05-27 00:42:49,511] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
[2021-05-27 00:42:49,513] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-05-27 00:42:49,516] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2021-05-27 00:42:49,517] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-05-27 00:42:49,527] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2021-05-27 00:42:49,562] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2021-05-27 00:42:49,563] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2021-05-27 00:42:49,571] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2021-05-27 00:42:49,589] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2021-05-27 00:42:49,594] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2021-05-27 00:42:49,597] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(0.0.0.0,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2021-05-27 00:42:49,613] INFO Kafka version : 0.9.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-27 00:42:49,613] INFO Kafka commitId : fc7243c2af4b2b4a (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-27 00:42:49,614] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2021-05-27 00:42:49,986] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-topic,5],[test,0],[kafkatopic,11],[my-topic,0],[my-topic,7],[kafkatopic,3],[kafkatopic,1],[kafkatopic,6],[kafkatopic,12],[kafkatopic,7],[kafkatopic,4],[kafkatopic,2],[kafka_topic_phrases,0],[kafkatopic,8],[kafka_topic_sentences,0],[my-topic,4],[kafkatopic,9],[my-topic,12],[my-topic,6],[my-topic,11],[my-topic,10],[my-topic,9],[my-topic,2],[my-first-topic,0],[ExampleTopic,0],[kafkatopic,5],[kafkatopic,0],[my-topic,3],[newtopic,0],[my-topic,1],[kafkatopic,10],[my-topic,8] (kafka.server.ReplicaFetcherManager)
[2021-05-27 00:42:50,209] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-topic,5],[test,0],[kafkatopic,11],[my-topic,0],[my-topic,7],[kafkatopic,3],[kafkatopic,1],[kafkatopic,6],[kafkatopic,12],[kafkatopic,7],[kafkatopic,4],[kafkatopic,2],[kafka_topic_phrases,0],[kafkatopic,8],[kafka_topic_sentences,0],[my-topic,4],[kafkatopic,9],[my-topic,12],[my-topic,6],[my-topic,11],[my-topic,10],[my-topic,9],[my-topic,2],[my-first-topic,0],[ExampleTopic,0],[kafkatopic,5],[kafkatopic,0],[my-topic,3],[newtopic,0],[my-topic,1],[kafkatopic,10],[my-topic,8] (kafka.server.ReplicaFetcherManager)

2.5 Apache Kafka – Storm Api

In Storm, spout and bolts are used for data sources and pipelines respectively. Messages sent to a Kafka topic are consumed by the spout. Bolt takes the data as a stream from Spout. You can have different operations such as executing functions, tuple filters, aggregation of data as a stream, joins, and retrieving from data sources. The topology consists of spouts and bolts. Integration of Kafka and Storm is based on BrokerHosts, Kafka Config,SchemeAsMultiScheme, and SpoutConfig API. ZKHosts and StaticHosts are implementations of BrokerHosts Interface. ZKHosts track Kafka brokers using Zookeeper for storing the details. StaticHosts help in setting the Kafka Brokers and the details. ZkHosts is a better way as it is performant compared to StaticHosts. You can configure the Kafka cluster using KafkaConfig. SpoutConfig is used for storing the Zookeeper infomation. SchemeAsMultiScheme helps in transforming the data (ByteBuffer form) into Storm tuples. KafkaSpout helps in storm integration with Kafka.

2.6 Apache Kafka -Storm Integration Example

In this example, you can see the topology setup with spout and bolt. Let us look at the topology class first.

Apache Kafka Storm Integration – Topology

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormIntegrationExample {
   public static void main(String[] args) throws Exception{
      Config stormConfig = new Config();
      stormConfig.setDebug(true);
      stormConfig.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "kafka_topic_phrases";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig spoutConfiguration = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      spoutConfiguration.bufferSizeBytes = 1024 * 1024 * 4;
      spoutConfiguration.fetchSizeBytes = 1024 * 1024 * 4;
      spoutConfiguration.forceFromStart = true;
      spoutConfiguration.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder topologyBuilder = new TopologyBuilder();
      topologyBuilder.setSpout("kafka-spout", new KafkaSpout(spoutConfiguration));
      topologyBuilder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
      topologyBuilder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter");
         
      LocalCluster localCluster = new LocalCluster();
      localCluster.submitTopology("KafkaStormSample", stormConfig, topologyBuilder.createTopology());

      Thread.sleep(10000);
      
      localCluster.shutdown();
   }
}

The code below shows the PhraseSplitBolt which splits the phrases into words.

Apache Kafka Storm Integration – PhraseSplitBolt

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class PhraseSplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map getComponentConfiguration() {
      return null;
   }
   
}

Now let us look at WordCountBolt class.

Apache Kafka Storm Integration – WordCountBolt

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class WordCountBolt implements IRichBolt{
   Map counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map getComponentConfiguration() {
      return null;
   }
}

On kafka front, You can have a Simple producer sending messages. The source code of the SimpleProducer class is shown below.

Apache Kafka Storm Integration – WordCountBolt

import java.util.Properties;


import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducerExample {
   
   public static void main(String[] args) throws Exception{
      
 
      if(args.length == 0){
         System.out.println("Enter the topic to be created ");
         return;
      }
      
      
      String exampleTopicName = args[0].toString();
      
  
      Properties properties = new Properties();
      
      properties.put("bootstrap.servers", "localhost:9092");
      
      properties.put("acks", "all");
      
      properties.put("retries", 0);
      
      properties.put("batch.size", 16384);
      
      properties.put("linger.ms", 1);
      
      properties.put("buffer.memory", 33554432);
      
      properties.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
         
      properties.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer producer = new KafkaProducer
         (properties);

      String[] strings = new String[10];
      strings[0] = "hi";
      strings[1] = "kafka test";
      strings[2] = "storm check";
      strings[3] = "spark job";
      strings[4] = "message";
      strings[5] = "operator";
      strings[6] = "modulo";
      strings[7] = "remainder";           
      strings[8] = "backtype";           
      strings[9] = "utility";           
      for(int i = 0; i < 10; i++)
      {

          producer.send(new ProducerRecord(exampleTopicName,strings[i]));
          
             System.out.println("Message sent successfully "+ strings[i]);

            Thread.sleep(100);
      }      
          producer.close();
   }
}

You can compile the code using the command below

Kafka Simple Producer Compilation Command

 javac -cp "$KAFKA_HOME/libs/*" SimpleProducerExample.java

First, let us start with the zookeeper and Kafka server. You can compile all the storm classes using the command below. Ensure that the log4j-over-slf4j-1.6.6.jar is not part of the classpath in the command below. curator-client-2.9.1.jar,curator-framework-2.9.1, and guava-11.0.2.jar can be included in the classpath.

Storm side code -Compilation

apples-MacBook-Air:apache_stom bhagvan.kommadi$ javac -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." *.java

Then you can execute the storm side code using the command below

Storm side code – Execution

apples-MacBook-Air:apache_stom bhagvan.kommadi$ javac -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." *.java

Now you can start sending the messages to the kafka topic after creating the topic.

Kafka Simple Producer Compilation Command

 java -cp "$KAFKA_HOME/libs/*" SimpleProducerExample kafka_topic_phrases

The output of the command executed above is shown below:

Apache Kafka Server Output

apples-MacBook-Air:apachekafkaproducer bhagvan.kommadi$ java -cp "$KAFKA_HOME/libs/*:." SimpleProducerExample kafka_topic_phrases
Message sent successfully hi
Message sent successfully kafka test
Message sent successfully storm check
Message sent successfully spark job
Message sent successfully message
Message sent successfully operator
Message sent successfully modulo
Message sent successfully remainder
Message sent successfully backtype
Message sent successfully utility

On the storm side, the output of the executed command is shown below.

Apache Kafka Server Output

apples-MacBook-Air:apache_stom bhagvan.kommadi$ java -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." KafkaStormSample
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
3345 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
3370 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:host.name=localhost
3370 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_101
3370 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
3370 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre
3370 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.class.path=/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/storm-core-0.9.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-stacktrace-0.2.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/disruptor-2.10.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/minlog-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/compojure-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/reflectasm-1.07-shaded.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-core-1.1.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clout-1.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-logging-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/objenesis-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-jetty-adapter-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/slf4j-api-1.7.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jgrapht-core-0.9.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.macro-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/hiccup-0.3.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/servlet-api-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/json-simple-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/math.numeric-tower-0.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-util-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/carbonite-1.4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-core-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/chill-java-0.3.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/joda-time-2.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.cli-0.2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-io-2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-codec-1.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.logging-0.2.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-servlet-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/snakeyaml-1.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-exec-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/asm-4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-lang-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/core.incubator-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jline-2.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/kryo-2.21.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-fileupload-1.2.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-devel-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-time-0.4.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clojure-1.5.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-jcl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-1.2-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-core-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-client-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-framework-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/guava-11.0.2.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/kafka-clients-0.9.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/storm-kafka-0.9.5.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-http-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-log4j-appender-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-client-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-core-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zkclient-0.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-databind-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-media-jaxb-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-api-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-tools-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-guava-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/argparse4j-0.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-scaladoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-security-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-xml_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-parser-combinators_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-servlet-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-api-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.servlet-api-3.1.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-json-provider-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/log4j-1.2.17.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-module-jaxb-annotations-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-api-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/validation-api-1.1.0.Final.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-sources.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javassist-3.18.1-GA.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-test.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.annotation-api-1.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-library-2.11.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-json-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/metrics-core-2.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-clients-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/lz4-1.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-locator-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-io-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/osgi-resource-locator-1.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/aopalliance-repackaged-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jopt-simple-3.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/snappy-java-1.1.1.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-annotations-2.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zookeeper-3.4.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-util-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-server-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-file-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-runtime-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.ws.rs-api-2.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-utils-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-core-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-server-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-common-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-base-2.5.4.jar:.
3372 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
3372 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.compiler=
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:os.version=10.16
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:user.name=bhagvan.kommadi
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:user.home=/Users/bhagvan.kommadi
3373 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:user.dir=/Users/bhagvan.kommadi/Downloads/apache_stom
3391 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
3391 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:host.name=localhost
3391 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.8.0_101
3391 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation
3391 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre
3391 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.class.path=/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/storm-core-0.9.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-stacktrace-0.2.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/disruptor-2.10.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/minlog-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/compojure-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/reflectasm-1.07-shaded.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-core-1.1.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clout-1.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-logging-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/objenesis-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-jetty-adapter-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/slf4j-api-1.7.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jgrapht-core-0.9.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.macro-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/hiccup-0.3.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/servlet-api-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/json-simple-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/math.numeric-tower-0.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-util-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/carbonite-1.4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-core-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/chill-java-0.3.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/joda-time-2.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.cli-0.2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-io-2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-codec-1.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.logging-0.2.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-servlet-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/snakeyaml-1.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-exec-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/asm-4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-lang-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/core.incubator-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jline-2.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/kryo-2.21.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-fileupload-1.2.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-devel-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-time-0.4.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clojure-1.5.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-jcl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-1.2-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-core-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-client-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-framework-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/guava-11.0.2.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/kafka-clients-0.9.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/storm-kafka-0.9.5.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-http-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-log4j-appender-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-client-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-core-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zkclient-0.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-databind-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-media-jaxb-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-api-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-tools-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-guava-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/argparse4j-0.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-scaladoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-security-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-xml_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-parser-combinators_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-servlet-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-api-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.servlet-api-3.1.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-json-provider-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/log4j-1.2.17.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-module-jaxb-annotations-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-api-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/validation-api-1.1.0.Final.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-sources.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javassist-3.18.1-GA.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-test.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.annotation-api-1.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-library-2.11.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-json-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/metrics-core-2.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-clients-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/lz4-1.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-locator-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-io-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/osgi-resource-locator-1.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/aopalliance-repackaged-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jopt-simple-3.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/snappy-java-1.1.1.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-annotations-2.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zookeeper-3.4.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-util-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-server-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-file-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-runtime-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.ws.rs-api-2.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-utils-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-core-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-server-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-common-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-base-2.5.4.jar:.
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.compiler=
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.name=Mac OS X
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.arch=x86_64
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.version=10.16
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.name=bhagvan.kommadi
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.home=/Users/bhagvan.kommadi
3392 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.dir=/Users/bhagvan.kommadi/Downloads/apache_stom
4499 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/a38c5a4c-6ac0-423b-913e-097a53145a3f/version-2 snapdir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/a38c5a4c-6ac0-423b-913e-097a53145a3f/version-2
4532 [main] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000
4541 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//a38c5a4c-6ac0-423b-913e-097a53145a3f
4817 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
4836 [main] INFO  backtype.storm.daemon.nimbus - Using default scheduler
4863 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
4959 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
4962 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@2416498e
4998 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
5096 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
5096 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50490
5111 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50490
5114 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.persistence.FileTxnLog - Creating new log file: log.1
5126 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700000 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50490
5127 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700000, negotiated timeout = 20000
5130 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
5134 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
6193 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700000
6194 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700000 closed
6194 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6196 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50490 which had sessionid 0x179a924e7700000
6197 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
6197 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
6198 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@43f0c2d1
6199 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
6200 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
6200 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50491
6200 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50491
6202 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700001 with negotiated timeout 20000 for client /127.0.0.1:50491
6202 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700001, negotiated timeout = 20000
6202 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
6239 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
6240 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
6240 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@23592946
6242 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
6243 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
6243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50492
6243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50492
6245 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700002 with negotiated timeout 20000 for client /127.0.0.1:50492
6245 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700002, negotiated timeout = 20000
6245 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
6246 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
7255 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700002
7257 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700002 closed
7257 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7257 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7258 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50492 which had sessionid 0x179a924e7700002
7258 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7258 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5c60b0a0
7260 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7261 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7261 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7261 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7261 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50493
7262 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50493
7262 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@23f3dbf0
7264 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7264 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700003 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50493
7264 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700003, negotiated timeout = 20000
7264 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7264 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7264 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50494
7265 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50494
7266 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700004 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50494
7266 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700004, negotiated timeout = 20000
7267 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7267 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
7269 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700004
7270 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700004 closed
7284 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7284 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7284 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7286 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@31ff6309
7286 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50494 which had sessionid 0x179a924e7700004
7289 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
7290 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
7290 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50495
7291 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50495
7296 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700005 with negotiated timeout 20000 for client /127.0.0.1:50495
7296 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700005, negotiated timeout = 20000
7297 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7318 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//4a2ac468-1fa6-4abd-9863-0b9babd83b88", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
7345 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7345 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7346 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@46731692
7348 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7349 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7349 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50496
7349 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50496
7351 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700006 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50496
7351 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700006, negotiated timeout = 20000
7354 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7354 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
7359 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700006
7360 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700006 closed
7360 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7360 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7361 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7361 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50496 which had sessionid 0x179a924e7700006
7362 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@ad9e63e
7364 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
7365 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
7365 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50497
7365 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50497
7367 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700007 with negotiated timeout 20000 for client /127.0.0.1:50497
7367 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700007, negotiated timeout = 20000
7367 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7404 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor with id bb3e0fa6-6811-4f10-ba4e-34f018909fa6 at host localhost
7407 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
7410 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7411 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7411 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@b0fc838
7413 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7413 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7413 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50498
7414 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50498
7415 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700008 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50498
7415 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700008, negotiated timeout = 20000
7415 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7415 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
7418 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700008
7419 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700008 closed
7420 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7420 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50498 which had sessionid 0x179a924e7700008
7420 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7422 [main] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7422 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5fbdc49b
7424 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
7425 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
7425 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50499
7425 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50499
7426 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700009 with negotiated timeout 20000 for client /127.0.0.1:50499
7427 [main-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700009, negotiated timeout = 20000
7427 [main-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7436 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor with id 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 at host localhost
7508 [main] INFO  backtype.storm.daemon.nimbus - Received topology submission for KafkaStormSample with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "KafkaStormSample", "storm.id" "KafkaStormSample-1-1622040441", "topology.debug" true, "topology.max.spout.pending" 1}
7540 [main] INFO  backtype.storm.daemon.nimbus - Activating KafkaStormSample: KafkaStormSample-1-1622040441
7650 [main] INFO  backtype.storm.scheduler.EvenScheduler - Available slots: (["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1028] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1029] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1024] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1025] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1026])
7680 [main] INFO  backtype.storm.daemon.nimbus - Setting new assignment for topology id KafkaStormSample-1-1622040441: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d/nimbus/stormdist/KafkaStormSample-1-1622040441", :node->host {"563a836c-55ba-4b7e-9f4c-beaf44aea5a3" "localhost"}, :executor->node+port {[4 4] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [3 3] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [2 2] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [1 1] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027]}, :executor->start-time-secs {[4 4] 1622040441, [3 3] 1622040441, [2 2] 1622040441, [1 1] 1622040441}}
8921 [Thread-5] INFO  backtype.storm.daemon.supervisor - Extracting resources from jar at /users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar to /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df/supervisor/stormdist/KafkaStormSample-1-1622040441/resources
8962 [Thread-6] INFO  backtype.storm.daemon.supervisor - Launching worker with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id "KafkaStormSample-1-1622040441", :executors ([4 4] [3 3] [2 2] [1 1])} for this supervisor 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 on port 1027 with id 39344513-9c18-41e7-bf9b-fd894180922f
8969 [Thread-6] INFO  backtype.storm.daemon.worker - Launching worker for KafkaStormSample-1-1622040441 on 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 with id 39344513-9c18-41e7-bf9b-fd894180922f and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
8970 [Thread-6] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
8970 [Thread-6] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
8972 [Thread-6] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@559169a4
8978 [Thread-6-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
8979 [Thread-6-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
8979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50500
8979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50500
8981 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000a with negotiated timeout 20000 for client /127.0.0.1:50500
8981 [Thread-6-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e770000a, negotiated timeout = 20000
8982 [Thread-6-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
8982 [Thread-6-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
8985 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000a
8987 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50500 which had sessionid 0x179a924e770000a
8987 [Thread-6] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e770000a closed
8987 [Thread-6-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
8987 [Thread-6] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
8989 [Thread-6] INFO  org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
8989 [Thread-6] INFO  org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@2a7cbfce
8992 [Thread-6-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
8992 [Thread-6-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
8992 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50501
8993 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50501
8994 [Thread-6-SendThread(localhost:2000)] INFO  org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e770000b, negotiated timeout = 20000
8994 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000b with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50501
8995 [Thread-6-EventThread] INFO  org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
9004 [Thread-6] INFO  backtype.storm.daemon.worker - Reading Assignments.
9074 [Thread-6] INFO  backtype.storm.daemon.worker - Launching receive-thread for 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027
9083 [Thread-7-worker-receiver-thread-0] INFO  backtype.storm.messaging.loader - Starting receive-thread: [stormId: KafkaStormSample-1-1622040441, port: 1027, thread-id: 0 ]
9410 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor kafka-spout:[2 2]
9429 [Thread-6] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __system ["startup"]
9429 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks kafka-spout:[2 2]
9443 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor kafka-spout:[2 2]
9459 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor word-counter:[3 3]
9460 [Thread-6] INFO  backtype.storm.daemon.task - Emitting: word-counter __system ["startup"]
9461 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks word-counter:[3 3]
9469 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor word-counter:[3 3]
9479 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor word-spitter:[4 4]
9480 [Thread-6] INFO  backtype.storm.daemon.task - Emitting: word-spitter __system ["startup"]
9481 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks word-spitter:[4 4]
9483 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor word-spitter:[4 4]
9491 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor __system:[-1 -1]
9492 [Thread-6] INFO  backtype.storm.daemon.task - Emitting: __system __system ["startup"]
9492 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks __system:[-1 -1]
9495 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor __system:[-1 -1]
9502 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor __acker:[1 1]
9506 [Thread-6] INFO  backtype.storm.daemon.task - Emitting: __acker __system ["startup"]
9506 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks __acker:[1 1]
9509 [Thread-6] INFO  backtype.storm.daemon.executor - Timeouts disabled for executor __acker:[1 1]
9509 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor __acker:[1 1]
9518 [Thread-6] INFO  backtype.storm.daemon.worker - Worker has topology config {"storm.id" "KafkaStormSample-1-1622040441", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "topology.name" "KafkaStormSample", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" 1, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" true, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
9520 [Thread-6] INFO  backtype.storm.daemon.worker - Worker 39344513-9c18-41e7-bf9b-fd894180922f for storm KafkaStormSample-1-1622040441 on 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 has finished loading
10039 [refresh-active-timer] INFO  backtype.storm.daemon.worker - All connections are ready for worker 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 with id 39344513-9c18-41e7-bf9b-fd894180922f
10062 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Opening spout kafka-spout:(2)
10087 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Preparing bolt word-counter:(3)
10095 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Prepared bolt word-counter:(3)
10107 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Preparing bolt word-spitter:(4)
10108 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Prepared bolt word-spitter:(4)
10113 [Thread-15-__system] INFO  backtype.storm.daemon.executor - Preparing bolt __system:(-1)
10116 [Thread-15-__system] INFO  backtype.storm.daemon.executor - Prepared bolt __system:(-1)
10132 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Preparing bolt __acker:(1)
10135 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Prepared bolt __acker:(1)
10197 [Thread-9-kafka-spout] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
10220 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50502
10222 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50502
10223 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000c with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50502
10230 [Thread-9-kafka-spout-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
10232 [Thread-9-kafka-spout] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
10245 [Thread-9-kafka-spout-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
10283 [Thread-9-kafka-spout] INFO  storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=0.0.0.0:9092}}
10284 [Thread-9-kafka-spout] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
10288 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Opened spout kafka-spout:(2)
10293 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Activating spout kafka-spout:(2)
10293 [Thread-9-kafka-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections
10297 [Thread-9-kafka-spout-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
10301 [Thread-9-kafka-spout] INFO  storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=0.0.0.0:9092}}
10303 [Thread-9-kafka-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=0.0.0.0:9092, partition=0}]
10303 [Thread-9-kafka-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
10303 [Thread-9-kafka-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=0.0.0.0:9092, partition=0}]
10526 [Thread-9-kafka-spout] INFO  storm.kafka.PartitionManager - Read partition information from: /kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db/partition_0  --> null
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
10836 [Thread-9-kafka-spout] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
10836 [Thread-9-kafka-spout] INFO  storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
10838 [Thread-9-kafka-spout] INFO  storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
10839 [Thread-9-kafka-spout] INFO  storm.kafka.PartitionManager - Starting Kafka 0.0.0.0:0 from offset 0
10843 [Thread-9-kafka-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
10943 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [hi]
10945 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {-9145905182196884541=-200665344721218530}, [hi]
10946 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [hi]
10946 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [hi]
10946 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [-9145905182196884541 -200665344721218530]
10946 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [-9145905182196884541 -200665344721218530]
10948 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [-9145905182196884541 -200665344721218530 2]
10950 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [-9145905182196884541 -200665344721218530 2]
10951 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [-9145905182196884541]
10954 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-9145905182196884541]
10955 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@749098b6
10955 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [kafka test]
10956 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [3361071049515033607 -3553068181479967910 2]
10956 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {3361071049515033607=-3553068181479967910}, [kafka test]
10956 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [kafka]
10956 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [3361071049515033607 -3553068181479967910 2]
10956 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [test]
10956 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [3361071049515033607 -3553068181479967910]
10956 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [kafka]
10956 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [test]
10958 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [3361071049515033607 -3553068181479967910]
10958 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [3361071049515033607]
10958 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [3361071049515033607]
10959 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@2c88dd7e
10959 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [storm check]
10959 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [5656882679628994049 -2821302035063205381 2]
10959 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {5656882679628994049=-2821302035063205381}, [storm check]
10960 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [storm]
10960 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [check]
10960 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [5656882679628994049 -2821302035063205381]
10960 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [storm]
10960 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [check]
10960 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [5656882679628994049 -2821302035063205381]
10961 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [5656882679628994049 -2821302035063205381 2]
10961 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [5656882679628994049]
10962 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [5656882679628994049]
10963 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@55ffcecb
10963 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [spark job]
10963 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {7133074069827401185=7517764075602487290}, [spark job]
10963 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [spark]
10963 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [job]
10964 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [spark]
10964 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [7133074069827401185 7517764075602487290]
10964 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [job]
10964 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [7133074069827401185 7517764075602487290]
10964 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [7133074069827401185 7517764075602487290 2]
10964 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [7133074069827401185 7517764075602487290 2]
10965 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [7133074069827401185]
10966 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [7133074069827401185]
10967 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@79baf4fe
10967 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [message]
10969 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {6343398862224861484=-4414870605500453828}, [message]
10969 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [6343398862224861484 -4414870605500453828 2]
10970 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [message]
10970 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [6343398862224861484 -4414870605500453828]
10970 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [6343398862224861484 -4414870605500453828 2]
10970 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [message]
10972 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [6343398862224861484 -4414870605500453828]
10972 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [6343398862224861484]
10974 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [6343398862224861484]
10974 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@135f554d
10974 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [operator]
10974 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {-3129322591382918852=-740436277297000921}, [operator]
10974 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [operator]
10974 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [-3129322591382918852 -740436277297000921 2]
10975 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [-3129322591382918852 -740436277297000921]
10975 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [operator]
10975 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [-3129322591382918852 -740436277297000921]
10975 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [-3129322591382918852 -740436277297000921 2]
10975 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [-3129322591382918852]
10977 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-3129322591382918852]
10977 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@46556960
10977 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [modulo]
10978 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {3640793944618690795=8106108915378369655}, [modulo]
10978 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [modulo]
10978 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [3640793944618690795 8106108915378369655]
10978 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [modulo]
10978 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [3640793944618690795 8106108915378369655]
10979 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [3640793944618690795 8106108915378369655 2]
10979 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [3640793944618690795 8106108915378369655 2]
10979 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [3640793944618690795]
10979 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [3640793944618690795]
10979 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@28df1f28
10980 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [remainder]
10980 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2596062899289317771=2634515203207894354}, [remainder]
10980 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [remainder]
10980 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2596062899289317771 2634515203207894354]
10980 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [remainder]
10981 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2596062899289317771 2634515203207894354]
10981 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2596062899289317771 2634515203207894354 2]
10981 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2596062899289317771 2634515203207894354 2]
10981 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2596062899289317771]
10983 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2596062899289317771]
10983 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@70d7b03d
10983 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [backtype]
10984 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2514155816374762264 -1773960649746285671 2]
10984 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2514155816374762264=-1773960649746285671}, [backtype]
10984 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [backtype]
10984 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2514155816374762264 -1773960649746285671]
10984 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2514155816374762264 -1773960649746285671 2]
10984 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [backtype]
10984 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2514155816374762264 -1773960649746285671]
10984 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2514155816374762264]
10986 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2514155816374762264]
10986 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@194b6657
10986 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout default [utility]
10987 [Thread-13-word-spitter] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2895993183130256304=2076802810368494976}, [utility]
10987 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter default [utility]
10987 [Thread-13-word-spitter] INFO  backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2895993183130256304 2076802810368494976]
10987 [Thread-11-word-counter] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [utility]
10987 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2895993183130256304 2076802810368494976]
10989 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2895993183130256304 2076802810368494976 2]
10990 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2895993183130256304 2076802810368494976 2]
10990 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2895993183130256304]
10991 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2895993183130256304]
10992 [Thread-9-kafka-spout] INFO  backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@74380277
12988 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x179a924e770000c type:create cxid:0x3 zxid:0x22 txntype:-1 reqpath:n/a Error Path:/kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db Error:KeeperErrorCode = NoNode for /kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db
17697 [main] INFO  backtype.storm.daemon.nimbus - Shutting down master
17699 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700001
17700 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700001 closed
17700 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17700 [main] INFO  backtype.storm.daemon.nimbus - Shut down master
17700 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50491 which had sessionid 0x179a924e7700001
17703 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700003
17704 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50493 which had sessionid 0x179a924e7700003
17704 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700003 closed
17705 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17706 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700005
17707 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700005 closed
17707 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50495 which had sessionid 0x179a924e7700005
17707 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17708 [main] INFO  backtype.storm.daemon.supervisor - Shutting down supervisor bb3e0fa6-6811-4f10-ba4e-34f018909fa6
17709 [Thread-3] INFO  backtype.storm.event - Event manager interrupted
17710 [Thread-4] INFO  backtype.storm.event - Event manager interrupted
17712 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700007
17712 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700007 closed
17713 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17713 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50497 which had sessionid 0x179a924e7700007
17714 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:39344513-9c18-41e7-bf9b-fd894180922f
17714 [main] INFO  backtype.storm.process-simulator - Killing process 9e419fa3-7aeb-4b37-982a-7ff15ea211fb
17715 [main] INFO  backtype.storm.daemon.worker - Shutting down worker KafkaStormSample-1-1622040441 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 1027
17715 [main] INFO  backtype.storm.daemon.worker - Shutting down receive thread
17715 [main] INFO  backtype.storm.messaging.loader - Shutting down receiving-thread: [KafkaStormSample-1-1622040441, 1027]
17715 [main] INFO  backtype.storm.messaging.loader - Waiting for receiving-thread:[KafkaStormSample-1-1622040441, 1027] to die
17716 [Thread-7-worker-receiver-thread-0] INFO  backtype.storm.messaging.loader - Receiving-thread:[KafkaStormSample-1-1622040441, 1027] received shutdown notice
17717 [main] INFO  backtype.storm.messaging.loader - Shutdown receiving-thread: [KafkaStormSample-1-1622040441, 1027]
17717 [main] INFO  backtype.storm.daemon.worker - Shut down receive thread
17717 [main] INFO  backtype.storm.daemon.worker - Terminating messaging context
17717 [main] INFO  backtype.storm.daemon.worker - Shutting down executors
17718 [main] INFO  backtype.storm.daemon.executor - Shutting down executor kafka-spout:[2 2]
17719 [Thread-9-kafka-spout] INFO  backtype.storm.util - Async loop interrupted!
17719 [Thread-8-disruptor-executor[2 2]-send-queue] INFO  backtype.storm.util - Async loop interrupted!
17725 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000c
17726 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50502 which had sessionid 0x179a924e770000c
17727 [main] INFO  backtype.storm.daemon.executor - Shut down executor kafka-spout:[2 2]
17727 [main] INFO  backtype.storm.daemon.executor - Shutting down executor word-counter:[3 3]
17728 [Thread-11-word-counter] INFO  backtype.storm.util - Async loop interrupted!
17728 [Thread-10-disruptor-executor[3 3]-send-queue] INFO  backtype.storm.util - Async loop interrupted!
hi : 1
storm : 1
test : 1
utility : 1
check : 1
message : 1
operator : 1
spark : 1
kafka : 1
backtype : 1
job : 1
modulo : 1
remainder : 1
17729 [main] INFO  backtype.storm.daemon.executor - Shut down executor word-counter:[3 3]
17729 [main] INFO  backtype.storm.daemon.executor - Shutting down executor word-spitter:[4 4]
17730 [Thread-13-word-spitter] INFO  backtype.storm.util - Async loop interrupted!
17730 [Thread-12-disruptor-executor[4 4]-send-queue] INFO  backtype.storm.util - Async loop interrupted!
17731 [main] INFO  backtype.storm.daemon.executor - Shut down executor word-spitter:[4 4]
17732 [main] INFO  backtype.storm.daemon.executor - Shutting down executor __system:[-1 -1]
17733 [Thread-15-__system] INFO  backtype.storm.util - Async loop interrupted!
17734 [Thread-14-disruptor-executor[-1 -1]-send-queue] INFO  backtype.storm.util - Async loop interrupted!
17736 [main] INFO  backtype.storm.daemon.executor - Shut down executor __system:[-1 -1]
17736 [main] INFO  backtype.storm.daemon.executor - Shutting down executor __acker:[1 1]
17736 [Thread-17-__acker] INFO  backtype.storm.util - Async loop interrupted!
17737 [Thread-16-disruptor-executor[1 1]-send-queue] INFO  backtype.storm.util - Async loop interrupted!
17737 [main] INFO  backtype.storm.daemon.executor - Shut down executor __acker:[1 1]
17737 [main] INFO  backtype.storm.daemon.worker - Shut down executors
17738 [main] INFO  backtype.storm.daemon.worker - Shutting down transfer thread
17738 [Thread-18-disruptor-worker-transfer-queue] INFO  backtype.storm.util - Async loop interrupted!
17740 [main] INFO  backtype.storm.daemon.worker - Shut down transfer thread
17748 [main] INFO  backtype.storm.daemon.worker - Shutting down default resources
17748 [main] INFO  backtype.storm.daemon.worker - Shut down default resources
17762 [main] INFO  backtype.storm.daemon.worker - Disconnecting from storm cluster state context
17763 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000b
17763 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50501 which had sessionid 0x179a924e770000b
17764 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e770000b closed
17764 [Thread-6-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17764 [main] INFO  backtype.storm.daemon.worker - Shut down worker KafkaStormSample-1-1622040441 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 1027
17772 [main] INFO  backtype.storm.daemon.supervisor - Shut down 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:39344513-9c18-41e7-bf9b-fd894180922f
17773 [main] INFO  backtype.storm.daemon.supervisor - Shutting down supervisor 563a836c-55ba-4b7e-9f4c-beaf44aea5a3
17778 [Thread-5] INFO  backtype.storm.event - Event manager interrupted
17779 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
17780 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700009
17780 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700009 closed
17781 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50499 which had sessionid 0x179a924e7700009
17781 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17781 [main] INFO  backtype.storm.testing - Shutting down in process zookeeper
17782 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
17782 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down
17782 [main] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down
17782 [main] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down
17782 [main] INFO  org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down
17782 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
17782 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
17783 [main] INFO  org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
17783 [main] INFO  backtype.storm.testing - Done shutting down in process zookeeper
17783 [main] INFO  backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d
17789 [main] INFO  backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//a38c5a4c-6ac0-423b-913e-097a53145a3f
17791 [main] INFO  backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//4a2ac468-1fa6-4abd-9863-0b9babd83b88
17797 [main] INFO  backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df
17916 [SessionTracker] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!

You can see the snippet from the output to zoom into the results of the spout and the bolts of storm for the data sent to kafka topic.

Output from the Storm side code

hi : 1
storm : 1
test : 1
utility : 1
check : 1
message : 1
operator : 1
spark : 1
kafka : 1
backtype : 1
job : 1
modulo : 1
remainder : 1

3. Download the Source Code

Download
You can download the full source code of this example here: Apache Kafka Integration With Storm

Bhagvan Kommadi

Bhagvan Kommadi is the Founder of Architect Corner & has around 20 years’ experience in the industry, ranging from large scale enterprise development to helping incubate software product start-ups. He has done Masters in Industrial Systems Engineering at Georgia Institute of Technology (1997) and Bachelors in Aerospace Engineering from Indian Institute of Technology, Madras (1993). He is member of IFX forum,Oracle JCP and participant in Java Community Process. He founded Quantica Computacao, the first quantum computing startup in India. Markets and Markets have positioned Quantica Computacao in ‘Emerging Companies’ section of Quantum Computing quadrants. Bhagvan has engineered and developed simulators and tools in the area of quantum technology using IBM Q, Microsoft Q# and Google QScript. He has reviewed the Manning book titled : "Machine Learning with TensorFlow”. He is also the author of Packt Publishing book - "Hands-On Data Structures and Algorithms with Go".He is member of IFX forum,Oracle JCP and participant in Java Community Process. He is member of the MIT Technology Review Global Panel.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button