Kafka DISTRIBUTED incremental Hadoop consumer

In my last post, I noted that the scripts I provided did not work for consuming data from real (multi-node) Kafka clusters. I had the solution to that problem for a while but I’ve been super busy and didn’t have time to post it.

Then, a post on the Kafka user list prompted me to do it.

So, here are the files you will need to do incremental distributed consumption of data in a Kafka cluster while publishing that data to a Hadoop cluster:

https://gist.github.com/1671887

Automating incremental imports with the Kafka hadoop-consumer

EDIT (October 26th 2011)

The script I provide in this article only works for a Kafka “cluster” that has just a single broker.

I have found why and I am currently writing a new script that allows Hadoop to pull from multiple brokers. I’ll post it when I can :)

EDIT 2 (January 25th 2012)

This is the solution.

I finally got the Kafka hadoop-consumer to work properly, and as I said at the end of that post, the hadoop-consumer has all the functionality it needs for doing incremental imports, but it requires a little bit of hand holding.

When the hadoop-consumer is done importing, it generates an offset file, which you can use to start the next incremental import from where it last left off, but this file gets generated in the same HDFS folder as the imported data (the folder set by the output property in the .properties configuration file).

The next time you start the hadoop-consumer, it does not look for the offset file in the output directory, it looks for it in the input directory. The post I linked above from the Apache mailing list archive suggests using the same directory for input and output, but that doesn’t seem to work because the hadoop-consumer deletes everything inside the output folder if it already exists.

It seems easier to just move the generated offset file from the output folder to the input folder. When doing so, it’s important to overwrite the old offset file, otherwise both get read and the previously imported data will be re-imported again.

Besides moving the generated offset file, it is also necessary to change the output folder, otherwise the previously imported data will get overwritten by the new one.

I wrote a simple script that takes care of everything. You just need to pass it the topic to consume and the HDFS directory to write to as parameters. You can call the same script with the same parameters repeatedly (with cron jobs for example, or any other scheduler), and it will manage the offsets for you.

Inside the specified HDFS directory, it will manage an offset directory, containing the most recent offset file and using it as the hadoop-consumer’s input directory, and it will generate a hierarchy of directory to bucket the data on date and time. The current hierarchy is /Year/Month/Day/Time but that can be changed easily to other schemes by altering the content of the bucket_name variable.

If the offset file (or directory) does not exist, it will generate one that starts from offset 0, and thus consumes everything available. After each import, it always overwrites the old offset file with the new one.

You only need two files to make this work:

  • The template.properties file, from which a new file called current.properties is generated each time the script is invoked. The current.properties file is then passed to the hadoop-consumer. If you want to change some of the other properties (such as the Hadoop username and group to use and so on), you can change them in that file. If you want to make more of these parameters dynamic, you will need to do the same thing that I’ve done for the topic property (in both files). Currently, the kafka.server.uri is hard coded, so you need to change it to match your environment. This is the content of the file:
# name of test topic
kafka.etl.topic=${topic}

# hdfs location of jars
hdfs.default.classpath.dir=/tmp/kafka/lib

# number of test events to be generated
event.count=0

# hadoop id and group
hadoop.job.ugi=kafka,hadoop

# kafka server uri
kafka.server.uri=tcp://my-kafka-broker:9092

# hdfs location of input directory
input=${hdfs_dir}/offset/

# hdfs location of output directory
output=${hdfs_dir}/${bucket_name}

# limit the number of events to be fetched;
# value -1 means no limitation
kafka.request.limit=-1

# kafka parameters
client.buffer.size=1048576
client.so.timeout=60000
  • The spawn-incremental-hadoop-consumer bash script, which does everything I described earlier. This is the content of the script:
#/usr/bin/sh

if [[ ! ("$#" == 2) ]]; then
   echo "Usage: $0 topic hdfs_dir"
   echo ''
   echo 'topic: The Kafka topic to subscribe to.'
   echo 'hdfs_dir: The HDFS directory to write to. It will contain an offset directory containing the most recent offset file and a hierarchy of nested directories with a new bucket of imported data each time this script is called.'
   exit 1
fi

topic=$1
hdfs_dir=$2
bucket_name=`date +%Y/%m/%d/%Hh%M/`
generated_property_file='current.properties'
current_offset_file_exists=`hadoop fs -ls ${hdfs_dir}/offset`

eval "echo \"$(< template.properties)\"" > ${generated_property_file}

if [ -z "$current_offset_file_exists" ]; then
   echo "***************************************************************************************************************************"
   echo "The current offset file was not found, so a new one will be generated and the topic '${topic}' will be pulled from offset 0"
   echo "***************************************************************************************************************************"
   ./run-class.sh kafka.etl.impl.DataGenerator ${generated_property_file}
fi

echo "***************************************************************************************************************************"
echo "Importing topic '${topic}' to HDFS directory: ${hdfs_dir}/${bucket_name}"
echo "***************************************************************************************************************************"

./run-class.sh kafka.etl.impl.SimpleKafkaETLJob ${generated_property_file}

hadoop fs -cp ${hdfs_dir}/${bucketname}/${bucket_name}/offsets_000000m0-m-00000 ${hdfs_dir}/offset/1.dat

I tested this with both files placed directly inside the kafka/contrib/hadoop-consumer directory in the source code checked out from the Kafka trunk. I’m running the script from there and it works for me, although there might be things that are specific to my environment that I haven’t taken into account. Please let me know if you run into any problem when running the script in your environment.

The template.properties file must remain named this way, since it’s referenced by the the script, but the script could be named however you want and that wouldn’t change anything…

Enjoy :) !

My respect for trunks has increased

And I’m not talking about the Dragon Ball character.

I always had this pre-conception that trunks were full of unstable, bleeding edge, code. It turns out that (for the Apache Kafka project anyway) what goes into the trunk really is the good stuff :)

I finally got the Kafka hadoop-consumer to work, which I’ve documented a few problems about in my previous post. I edited that post and it should now contain everything you need to to get it up and running.

Enjoy :) !

Running the sample Kafka Hadoop consumer

I’ve been trying out Kafka recently. I followed some of the quick start guide without problem. Then I got to the Hadoop consumer section, which is pretty crucial for my use case, and I got forwarded to a simple github page.

I will describe the problems I’ve had while following that github README, along with the solutions I found to solve them. Hopefully, that can end up helping someone out there :)

Step 1

Step 1 of the github read me failed for me with the following error:

root@my-server~/kafka-hadoop-consumer/kafka# ./sbt package
[info] Building project Kafka 0.7 against Scala 2.8.0
[info]    using KafkaProject with sbt 0.7.5 and Scala 2.7.7
[info]
[info] == core-kafka / compile ==
[info]   Source analysis: 126 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/Kafka.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/Kafka.scala:23: value log4j is not a member of package org.apache
[error] import org.apache.log4j.jmx.LoggerDynamicMBean
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala:22: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/ConsumerConnector.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/ConsumerIterator.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala:25: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/Fetcher.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/FetcherRunnable.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/SimpleConsumer.scala:23: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/TopicCount.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala:22: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/utils/KafkaScheduler.scala:22: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] error while loading ZooKeeper, Missing dependency 'class org.apache.log4j.Logger', required by /root/kafka-hadoop-consumer/kafka/core/lib/zookeeper-3.3.3.jar(org/apache/zookeeper/ZooKeeper.class)
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j._
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala:38: recursive value offset needs type
[error]       case Some(offset) => offset
[error]                            ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/javaapi/Implicits.scala:19: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/ProducerPool.scala:22: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/async/AsyncProducer.scala:22: value log4j is not a member of package org.apache
[error] import org.apache.log4j.{Level, Logger}
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/log/Log.scala:24: value log4j is not a member of package org.apache
[error] import org.apache.log4j._
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/log/LogManager.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/message/CompressionUtils.scala:25: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/message/FileMessageSet.scala:23: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/network/SocketServer.scala:28: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/network/Transmission.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.spi.LoggingEvent
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.{Logger, AppenderSkeleton}
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala:80: value closed is not a member of kafka.producer.KafkaLog4jAppender
[error]     if(!this.closed) {
[error]              ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala:81: value closed is not a member of kafka.producer.KafkaLog4jAppender
[error]       this.closed = true
[error]            ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/Producer.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/SyncProducer.scala:26: value log4j is not a member of package org.apache
[error] import org.apache.log4j.{Level, Logger}
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala:21: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/server/KafkaServerStartable.scala:19: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/server/KafkaZooKeeper.scala:20: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ConsumerPerformance.scala:23: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ConsumerPerformance.scala:28: too many arguments for constructor Object: ()java.lang.Object
[error] abstract class ShutdownableThread(name: String) extends Thread(name) {
[error]                                  ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ConsumerShell.scala:19: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/GetOffsetShell.scala:20: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ProducerPerformance.scala:25: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ProducerPerformance.scala:26: not found: value joptsimple
[error] import joptsimple.{OptionSet, OptionParser}
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ProducerShell.scala:21: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ReplayLogProducer.scala:4: not found: value joptsimple
[error] import joptsimple.OptionParser
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/ReplayLogProducer.scala:5: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala:20: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala:20: not found: value joptsimple
[error] import joptsimple._
[error]        ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/utils/Throttler.scala:19: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/utils/Utils.scala:25: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] /root/kafka-hadoop-consumer/kafka/core/src/main/scala/kafka/utils/ZkUtils.scala:25: value log4j is not a member of package org.apache
[error] import org.apache.log4j.Logger
[error]                   ^
[error] 55 errors found
[info] == core-kafka / compile ==
[error] Error running compile: Compilation failed
[info]
[info] Total time: 19 s, completed Sep 19, 2011 10:17:24 AM
[info]
[info] Total session time: 20 s, completed Sep 19, 2011 10:17:24 AM
[error] Error during build.

I had never used SBT before, so at first I was puzzled. Why would they post a command that spits out errors in their README?

It turns out the solution was pretty simple. I figured it out by looking at a similar problem on the old (pre-Apache incubation) Kafka mailing list. You just need to run ./sbt update, which prints out the following output:

root@my-server~/kafka-hadoop-consumer/kafka# ./sbt update
[info] Building project Kafka 0.7 against Scala 2.8.0
[info]    using KafkaProject with sbt 0.7.5 and Scala 2.7.7
[info]
[info] == core-kafka / update ==
[info] downloading http://repo2.maven.org/maven2/net/sf/jopt-simple/jopt-simple/3.2/jopt-simple-3.2.jar ...
[info]     [SUCCESSFUL ] net.sf.jopt-simple#jopt-simple;3.2!jopt-simple.jar (426ms)
[info] downloading http://repo2.maven.org/maven2/log4j/log4j/1.2.15/log4j-1.2.15.jar ...
[info]     [SUCCESSFUL ] log4j#log4j;1.2.15!log4j.jar (429ms)
[info] downloading http://repo2.maven.org/maven2/junit/junit/4.1/junit-4.1.jar ...
[info]     [SUCCESSFUL ] junit#junit;4.1!junit.jar (246ms)
[info] downloading http://repo2.maven.org/maven2/org/easymock/easymock/3.0/easymock-3.0.jar ...
[info]     [SUCCESSFUL ] org.easymock#easymock;3.0!easymock.jar (160ms)
[info] downloading http://repo2.maven.org/maven2/org/scalatest/scalatest/1.2/scalatest-1.2.jar ...
[info]     [SUCCESSFUL ] org.scalatest#scalatest;1.2!scalatest.jar (537ms)
[info] downloading http://repo2.maven.org/maven2/cglib/cglib-nodep/2.2/cglib-nodep-2.2.jar ...
[info]     [SUCCESSFUL ] cglib#cglib-nodep;2.2!cglib-nodep.jar (57ms)
[info] downloading http://repo2.maven.org/maven2/org/objenesis/objenesis/1.2/objenesis-1.2.jar ...
[info]     [SUCCESSFUL ] org.objenesis#objenesis;1.2!objenesis.jar (235ms)
[info] :: retrieving :: kafka#core-kafka_2.8.0 [sync]
[info]     confs: [compile, runtime, test, provided, system, optional, sources, javadoc]
[info]     7 artifacts copied, 0 already retrieved (2748kB/39ms)
[info] == core-kafka / update ==
[info]
[info] == hadoop producer / update ==
[info] downloading http://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.5.5/jackson-core-asl-1.5.5.jar ...
[info]     [SUCCESSFUL ] org.codehaus.jackson#jackson-core-asl;1.5.5!jackson-core-asl.jar (555ms)
[info] downloading http://repo1.maven.org/maven2/org/apache/avro/avro/1.4.1/avro-1.4.1.jar ...
[info]     [SUCCESSFUL ] org.apache.avro#avro;1.4.1!avro.jar (452ms)
[info] downloading http://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.5.5/jackson-mapper-asl-1.5.5.jar ...
[info]     [SUCCESSFUL ] org.codehaus.jackson#jackson-mapper-asl;1.5.5!jackson-mapper-asl.jar (149ms)
[info] downloading http://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.5.11/slf4j-api-1.5.11.jar ...
[info]     [SUCCESSFUL ] org.slf4j#slf4j-api;1.5.11!slf4j-api.jar (291ms)
[info] downloading http://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.2/paranamer-2.2.jar ...
[info]     [SUCCESSFUL ] com.thoughtworks.paranamer#paranamer;2.2!paranamer.jar (336ms)
[info] downloading http://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer-ant/2.2/paranamer-ant-2.2.jar ...
[info]     [SUCCESSFUL ] com.thoughtworks.paranamer#paranamer-ant;2.2!paranamer-ant.jar (139ms)
[info] downloading http://repo1.maven.org/maven2/org/mortbay/jetty/jetty/6.1.22/jetty-6.1.22.jar ...
[info]     [SUCCESSFUL ] org.mortbay.jetty#jetty;6.1.22!jetty.jar (535ms)
[info] downloading http://repo1.maven.org/maven2/org/apache/velocity/velocity/1.6.4/velocity-1.6.4.jar ...
[info]     [SUCCESSFUL ] org.apache.velocity#velocity;1.6.4!velocity.jar (133ms)
[info] downloading http://repo1.maven.org/maven2/commons-lang/commons-lang/2.5/commons-lang-2.5.jar ...
[info]     [SUCCESSFUL ] commons-lang#commons-lang;2.5!commons-lang.jar (216ms)
[info] downloading http://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer-generator/2.2/paranamer-generator-2.2.jar ...
[info]     [SUCCESSFUL ] com.thoughtworks.paranamer#paranamer-generator;2.2!paranamer-generator.jar (212ms)
[info] downloading http://repo1.maven.org/maven2/org/apache/ant/ant/1.7.1/ant-1.7.1.jar ...
[info]     [SUCCESSFUL ] org.apache.ant#ant;1.7.1!ant.jar (343ms)
[info] downloading http://repo1.maven.org/maven2/com/thoughtworks/qdox/qdox/1.10.1/qdox-1.10.1.jar ...
[info]     [SUCCESSFUL ] com.thoughtworks.qdox#qdox;1.10.1!qdox.jar (447ms)
[info] downloading http://repo1.maven.org/maven2/asm/asm/3.2/asm-3.2.jar ...
[info]     [SUCCESSFUL ] asm#asm;3.2!asm.jar (112ms)
[info] downloading http://repo1.maven.org/maven2/org/apache/ant/ant-launcher/1.7.1/ant-launcher-1.7.1.jar ...
[info]     [SUCCESSFUL ] org.apache.ant#ant-launcher;1.7.1!ant-launcher.jar (248ms)
[info] downloading http://repo1.maven.org/maven2/org/mortbay/jetty/jetty-util/6.1.22/jetty-util-6.1.22.jar ...
[info]     [SUCCESSFUL ] org.mortbay.jetty#jetty-util;6.1.22!jetty-util.jar (321ms)
[info] downloading http://repo1.maven.org/maven2/org/mortbay/jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar ...
[info]     [SUCCESSFUL ] org.mortbay.jetty#servlet-api;2.5-20081211!servlet-api.jar (213ms)
[info] downloading http://repo1.maven.org/maven2/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar ...
[info]     [SUCCESSFUL ] commons-collections#commons-collections;3.2.1!commons-collections.jar (394ms)
[info] downloading http://repo1.maven.org/maven2/oro/oro/2.0.8/oro-2.0.8.jar ...
[info]     [SUCCESSFUL ] oro#oro;2.0.8!oro.jar (68ms)
[info] :: retrieving :: kafka#hadoop-producer_2.8.0 [sync]
[info]     confs: [compile, runtime, test, provided, system, optional, sources, javadoc]
[info]     20 artifacts copied, 0 already retrieved (5377kB/39ms)
[info] == hadoop producer / update ==
[info]
[info] == hadoop consumer / update ==
[info] downloading http://repo1.maven.org/maven2/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar ...
[info]     [SUCCESSFUL ] commons-httpclient#commons-httpclient;3.1!commons-httpclient.jar (599ms)
[info] downloading http://repo1.maven.org/maven2/joda-time/joda-time/1.6/joda-time-1.6.jar ...
[info]     [SUCCESSFUL ] joda-time#joda-time;1.6!joda-time.jar (396ms)
[info] downloading http://repo1.maven.org/maven2/commons-logging/commons-logging/1.0.4/commons-logging-1.0.4.jar ...
[info]     [SUCCESSFUL ] commons-logging#commons-logging;1.0.4!commons-logging.jar (105ms)
[info] downloading http://repo1.maven.org/maven2/commons-codec/commons-codec/1.2/commons-codec-1.2.jar ...
[info]     [SUCCESSFUL ] commons-codec#commons-codec;1.2!commons-codec.jar (246ms)
[info] :: retrieving :: kafka#hadoop-consumer_2.8.0 [sync]
[info]     confs: [compile, runtime, test, provided, system, optional, sources, javadoc]
[info]     6 artifacts copied, 0 already retrieved (1321kB/18ms)
[info] == hadoop consumer / update ==
[info]
[info] == contrib / update ==
[warn] No dependency configuration found, using defaults.
[info] :: retrieving :: kafka#contrib_2.8.0 [sync]
[info]     confs: [default]
[info]     0 artifacts copied, 0 already retrieved (0kB/3ms)
[info] == contrib / update ==
[info]
[info] == java-examples / update ==
[info] :: retrieving :: kafka#java-examples_2.8.0 [sync]
[info]     confs: [compile, runtime, test, provided, system, optional, sources, javadoc]
[info]     2 artifacts copied, 0 already retrieved (434kB/12ms)
[info] == java-examples / update ==
[info]
[info] == perf / update ==
[info] :: retrieving :: kafka#perf_2.8.0 [sync]
[info]     confs: [compile, runtime, test, provided, system, optional, sources, javadoc]
[info]     2 artifacts copied, 0 already retrieved (434kB/12ms)
[info] == perf / update ==
[info]
[info] == Kafka / update ==
[warn] No dependency configuration found, using defaults.
[info] :: retrieving :: kafka#kafka_2.8.0 [sync]
[info]     confs: [default]
[info]     0 artifacts copied, 0 already retrieved (0kB/3ms)
[info] == Kafka / update ==
[success] Successful.
[info]
[info] Total time: 70 s, completed Sep 19, 2011 10:21:23 AM
[info]
[info] Total session time: 71 s, completed Sep 19, 2011 10:21:23 AM
[success] Build completed successfully.

And then you can run the ./sbt package command, which this time printed out the (overall) much nicer output below:

root@my-server~/kafka-hadoop-consumer/kafka# ./sbt package
[info] Building project Kafka 0.7 against Scala 2.8.0
[info]    using KafkaProject with sbt 0.7.5 and Scala 2.7.7
[info]
[info] == core-kafka / compile ==
[info]   Source analysis: 126 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[warn] there were unchecked warnings; re-run with -unchecked for details
[warn] one warning found
[info] Compilation successful.
[info]   Post-analysis: 566 classes.
[info] == core-kafka / compile ==
[info]
[info] == hadoop consumer / compile ==
[info]   Source analysis: 12 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[error] Note: Some input files use or override a deprecated API.
[error] Note: Recompile with -Xlint:deprecation for details.
[info] Compilation successful.
[info]   Post-analysis: 13 classes.
[info] == hadoop consumer / compile ==
[info]
[info] == java-examples / compile ==
[info]   Source analysis: 6 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[info] Compilation successful.
[info]   Post-analysis: 6 classes.
[info] == java-examples / compile ==
[info]
[info] == perf / compile ==
[info]   Source analysis: 6 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[info] Compilation successful.
[info]   Post-analysis: 6 classes.
[info] == perf / compile ==
[info]
[info] == core-kafka / package ==
[warn] No Main-Class attribute will be added automatically added:
[warn] Multiple classes with a main method were detected.  Specify main class explicitly with:
[warn]      override def mainClass = Some("className")
[info] Packaging ./core/target/scala_2.8.0/kafka-0.7.jar ...
[info] Packaging complete.
[info] == core-kafka / package ==
[info]
[info] == hadoop producer / compile ==
[info]   Source analysis: 4 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[info] Compilation successful.
[info]   Post-analysis: 5 classes.
[info] == hadoop producer / compile ==
[info]
[info] == hadoop producer / package ==
[info] Packaging ./contrib/hadoop-producer/target/scala_2.8.0/hadoop-producer_2.8.0-0.7.jar ...
[info] Packaging complete.
[info] == hadoop producer / package ==
[info]
[info] == hadoop consumer / package ==
[warn] No Main-Class attribute will be added automatically added:
[warn] Multiple classes with a main method were detected.  Specify main class explicitly with:
[warn]      override def mainClass = Some("className")
[info] Packaging ./contrib/hadoop-consumer/target/scala_2.8.0/hadoop-consumer_2.8.0-0.7.jar ...
[info] Packaging complete.
[info] == hadoop consumer / package ==
[info]
[info] == perf / package ==
[info] Packaging ./perf/target/scala_2.8.0/kafka-perf-0.7.jar ...
[info] Packaging complete.
[info] == perf / package ==
[info]
[info] == java-examples / package ==
[warn] No Main-Class attribute will be added automatically added:
[warn] Multiple classes with a main method were detected.  Specify main class explicitly with:
[warn]      override def mainClass = Some("className")
[info] Packaging ./examples/target/scala_2.8.0/kafka-java-examples-0.7.jar ...
[info] Packaging complete.
[info] == java-examples / package ==
[info]
[info] == Kafka / package ==
[info] == Kafka / package ==
[success] Successful.
[info]
[info] Total time: 55 s, completed Sep 19, 2011 10:22:38 AM
[info]
[info] Total session time: 56 s, completed Sep 19, 2011 10:22:38 AM
[success] Build completed successfully.

There are some [warn] and [error]  lines but at least the build says it completed successfully, so that should be good enough for now. On to step 2

Step 2

Didn’t have much problem here besides finding the hadoop-setup.sh script. Just use the locate command if you’re lazy like me and don’t want to lookup every directory manually until you find the file ;)

Step 3

Step 3 caused me some more troubles. Let’s review each sub step:

3.1 was okay since I had already followed the quick start guide far enough to have a server running and a simple producer and consumer.

In 3.2, I did not change any of the settings in test.properties since they seemed fine by default. The only one that would seem worth changing might be kafka.server.uri but since for now I am running the Kafka server on the same box where I’m trying to run the hadoop-consumer, the local address should be fine. I did create the HDFS directory mentioned in the input property though (not sure if it would have been created anyway by the DataGenerator program).

EDIT (September 29th 2011)

It turned out, later on, while I was doing Step 4, that not changing the default settings was a mistake. In particular, the kafka.server.uri property must be set to an address that is reachable from the outside.

In my case, I was running the DataGenerator program from the same box where the Kafka broker was running, so the DataGenerator WAS able to reach the broker and so that step seemed to be successful. The problem is the offset file (named “1.dat”) that the DataGenerator program writes in HDFS (in a directory determined by the input property) contains the kafka.server.uri inside it and this is what the (Map/Reduce) hadoop-consumer uses to connect from the Hadoop cluster to the broker in order to import the events. Since my Hadoop nodes were on other machines than the kafka.broker.uri machine, they were never able to reach it and were instead throwing:

java.io.IOException: java.net.ConnectException: Connection refused

Basically, I guess the only situation where it would work to have localhost in the kafka.server.uri property is if you are literally running EVERYTHING in the same box (the kafka broker(s), the DataGenerator program and a single-machine or pseudo-distributed Hadoop cluster).

Step 3.3 is where I’ve had a problem:

root@my-server~/kafka-hadoop-consumer/kafka/contrib/hadoop-consumer# ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
:./../../core/target/scala_2.8.0/kafka-0.7.jar:./../../contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/commons-codec-1.2.jar:./../../contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/commons-httpclient-3.1.jar:./../../contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/commons-logging-1.0.4.jar:./../../contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/joda-time-1.6.jar:./../../contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:./../../contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:./../../contrib/hadoop-consumer/target/scala_2.8.0/hadoop-consumer_2.8.0-0.7.jar:./../../contrib/hadoop-consumer/lib/avro-1.4.0.jar:./../../contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar:./../../contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar:./../../contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar:./../../contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar:./../../contrib/hadoop-consumer/lib/pig-0.8.0-core.jar:./../../contrib/hadoop-consumer/lib/piggybank.jar:./../../project/boot/scala-2.8.0/lib/scala-library.jar
topics=SimpleTestEvent
server uri:tcp://localhost:9092
 send 1000 SimpleTestEvent count events to tcp://localhost:9092
11/09/19 14:42:44 INFO producer.SyncProducer: Connected to localhost:9092 for producing
11/09/19 14:42:45 INFO producer.SyncProducer: Disconnecting from localhost:9092
Exception in thread "main" java.io.IOException: Call to my-namenode/10.10.9.1:8020 failed on local exception: java.io.EOFException
    at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
    at org.apache.hadoop.ipc.Client.call(Client.java:743)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
    at $Proxy4.getProtocolVersion(Unknown Source)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
    at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1378)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:180)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175)
    at kafka.etl.impl.DataGenerator.generateOffsets(DataGenerator.java:115)
    at kafka.etl.impl.DataGenerator.run(DataGenerator.java:107)
    at kafka.etl.impl.DataGenerator.main(DataGenerator.java:138)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:375)
    at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:501)
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:446)

Again, I found the answer in a post on the old (pre-Apache incubation) Kafka mailing list. The DataGenerator class depended on the hadoop-core jar located in contrib/hadoop-consumer/lib. My Hadoop cluster is running cdh3-u1, and it seems the hadoop jars included in Kafka are a different version.

I moved the lib/hadoop-0.20.2-core.jar out of that directory and instead I created a symbolic link to the same hadoop jars that my cluster uses.

cd lib
ln -s /usr/lib/hadoop/hadoop-core.jar hadoop-core.jar

After that, I’m not sure if it was necessary, but I re-ran ./sbt package and I was then able to run the DataGenerator as described in step 3.3 :)

Step 4

Step 4 is still in progress! Here’s what I did so far:

In step 4.1, at first, I didn’t change any of the default properties in test.properties.

In step 4.2, the copy-jars.sh script does not look up the hdfs.default.classpath.dir in any property file, so you have to pass it in instead of the ${hdfs.default.classpath.dir} parameter:

./copy-jars.sh /tmp/kafka/lib

Step 4.3 is giving me some headaches. I’ve been changing some of the test.properties from step 4.1 as well as some HDFS user permission settings in order to try to make it work, and I think I’ve made some progress, but it’s still not fully working.

Once I do get it working completely, I’ll post the details in a follow up article. Sorry for the suspense :P !

EDIT (September 29th 2011)

I got back to this the other day and finally got it to work :) !

First, I had a problem in my properties, which I’ve documented in another EDIT in Step 3. Then, I got a problem where the hadoop-consumer M/R job WAS able to connect to the Kafka broker server and import some events, but it was looping infinitely and re-importing the same events over and over!

It turns out this is a known and fixed bug, KAFKA-131. The latest pre-packaged stable release they provide is 0.6, but it contains this bug.

I checked out the trunk, packaged it using all the other instructions I documented earlier (including changing the hadoop jars for those I’m using!), I then restarted all the servers (ZK, the broker, etc) and everything worked smoothly :) !

Also, this post is useful in understanding what the hadoop-consumer does and what it expects you to do for him. I’m now working on a simple script to automate incremental imports so that we don’t have to do these other operations manually and just have it executed automatically every hour or whatever interval :)

Hopefully, this was helpful for you :) ! Kafka looks great, I can’t wait to do more with it :) !

Hive looking for the wrong Hadoop namenode

I recently changed my Hadoop namenode port to give it the more standard value of 8020. Up to now, I was working with the 54310 port, for the reasons I described in an earlier post. Everything was fine on the new 8020 port, until I tried going into Hive and running a simple query:

Failed with exception java.io.IOException:java.net.ConnectException: Call to my_namenode/10.10.9.1:54310 failed on connection exception: java.net.ConnectException: Connection refused

It was still trying to connect to the old port! Apparently, you will get the same problem if you change your namenode’s host name as well.

I found this very helpful post, which pointed me int he right direction, so I thought I would give some more details on how to make it work.

First, I backed up my Hive metastore using mysqldump:

mysqldump -pYOUR_PASSWORD metastore > metastore_dump.sql

(‘metastore’ is the name of my metastore MySQL database, but I think it could be different, depending how you set it up)

Then I edited metastore_dump.sql with vim and I replaced all occurrences of the old port for the new one:

vim metastore_dump.sql
:%s/:54310\//:8020\//g

Obviously, the stuff you will be searching for and replacing is probably going to differ.

After saving the modified dump, I went into MySQL and created a metastore2 database, because I didn’t want to delete my old broken one, just in case. But if you’re feeling a little more cow boy, then I guess you could overwrite your old metastore instead :)

mysql -pYOUR_PASSWORD
mysql> create database metastore2;
mysql> GRANT ALL PRIVILEGES ON metastore2.* TO 'hiveuser'@'MY_METASTORE_SERVER';
mysql> exit;

Then I edited the Hive configuration:

vim /etc/hive/conf/hive-site.xml

I changed the javax.jdo.option.ConnectionURL property so that it points to metastore2 instead of my original metastore database.

And that’s it :)

Hack/Reduce

Just a quick post to let you know that I’ll be in Ottawa this coming Saturday for Hack/Reduce and I think it’s going to be pretty sweet! If you’re not too far from Ottawa, interested in Hadoop and wanting to play around with it, then you should subscribe while there are still some developer spots left :)

About the Thrift code I talked about (to push events into Flume from Java), I’m sorry I haven’t gotten around to posting it yet. The code was working in CDH3u0 but then I updated my cluster to CDH3u1 and I’m having some problems with it. I’ve put that aside for now because some priorities shifted and there is other stuff I need to work on, but I hope I can come back to it later, fix its problems and post it here.

Stay tuned :) !

More on Flume

Here are a few more things I discovered about Flume. As you will see, I don’t have all the answers yet, so if you’re reading this and you know more than me, please leave a comment :) !

In my last post, I suggested using the following Flume config command:

remote_flume_node: helloWorldSource() | agentSink("flume_node_on_data_node",35853);
flume_node_on_data_node: collectorSource(35853) | dfs("hdfs://my_hadoop_namenode:namenode_port/path");

First of all, the dfs command writes as seq files, so I find it more convenient to use:

customdfs("hdfs://my_hadoop_namenode:namenode_port/path", "raw");

That’s already documented in the Flume User Guide, so it’s kind of pointless, but anyway… moving on to more interesting stuff:

One thing I’ve noticed is that I am receiving a LOT of data with this config, which is a little weird because the helloWorldSource() should only send one message per three seconds.

I modified the Java code to add a timestamp to the otherwise identical hello world messages. Then I defined an external Hive table on the directory where Flume is writing:

CREATE EXTERNAL TABLE hello_world(the_stuff STRING COMMENT 'The content of the whole line') STORED AS TEXTFILE LOCATION '/path';

I changed the permissions in HDFS because Hive uses a different user than Flume (which is kind of a pain, I’m still trying to figure out what’s the best way to have both services share the same user).

hadoop fs -chmod 777 /path/file-name

And I ran a Hive query to see how many of each messages was I getting:

SELECT the_stuff, COUNT(the_stuff) the_count FROM hello_world GROUP BY the_stuff SORT BY the_count;

And I could see that the same messages got sent over and over, with the oldest messages having a tendency to be sent most often, and the most recent ones being sent as few as one time each.

Inspecting the Flume logs did reveal a lot of activity, almost all of it being related to resending messages.

2011-07-20 00:00:06,338 INFO com.cloudera.flume.agent.durability.NaiveFileWALManager: opening log file  log.00000057.20110719-231832929-0400.391819512361752.seq
2011-07-20 00:00:06,338 INFO com.cloudera.flume.agent.WALAckManager: Retransmitting log.00000057.20110719-195722078-0400.379748661448165.seq after being stale for 62985ms
2011-07-20 00:00:06,339 INFO com.cloudera.flume.agent.WALAckManager: Retransmitting log.00000057.20110719-154659996-0400.364726579089083.seq after being stale for 62962ms
2011-07-20 00:00:06,339 INFO com.cloudera.flume.agent.WALAckManager: Retransmitting log.00000057.20110719-221639545-0400.388106128765740.seq after being stale for 62959ms
2011-07-20 00:00:06,339 INFO com.cloudera.flume.agent.WALAckManager: Retransmitting log.00000057.20110719-223530520-0400.389237103186647.seq after being stale for 62956ms
2011-07-20 00:00:06,339 INFO com.cloudera.flume.agent.WALAckManager: Retransmitting log.00000057.20110719-235335193-0400.393921776847351.seq after being stale for 62953ms
2011-07-20 00:00:06,339 INFO com.cloudera.flume.agent.WALAckManager: Retransmitting log.00000057.20110719-171033920-0400.369740503833085.seq after being stale for 62950ms

It’s as if the Flume nodes were not able to ACK properly, even though the message got sent successfully.

According to the user guide, agentSink defaults to the most reliable (End to End) sink. When I tried the other two kinds of sink, both worked without any duplication of messages.

agentDFOSink("flume_node_on_data_node",35853)
agentBESink("flume_node_on_data_node",35853)

I haven’t figured this one out either. But I did test taking the Flume collector node down in DFO mode, and the Flume agent node was able to re-send the messages that were created while the collector was down.

I don’t know if any of this can be useful to other people… I am giving more questions than answers, but that’s the best I can do for now hehe…

One last thing. I haven’t studied the code of the helloWorldSource in too much depth, but it seems to get its data pulled from it. Somehow, there is something that invokes the next() method repeatedly. This is not too convenient in my use cases, as I would like to push data into Flume, not have Flume do the pulling.

I worked on setting up a Java client that can push Flume messages using Thrift. It hasn’t been exactly straightforward, as there are less Thrift tutorials out there than there are pages mentioning how poor the Thrift documentation is. But I finally got something working by mix and matching some stuff. I’ll try to post this next week, but no promises ;) !

Problem getting Flume sink to write to HDFS on CDH3

I tried Flume’s Hello World tutorial to have a Java-based source and sink, and that worked all right.

node_name: helloWorldSource() | helloWorldSink();

I then tried to change the sink so that it wrote to HDFS instead, because we need an easy way to log events from Java to HDFS. To make matters as simple as possible at first, I ran the whole thing on one Flume node which itself was running on one of my Hadoop data nodes. Thus, I gave my Flume master the following command:

flume_node_on_data_node: helloWorldSource() | dfs("hdfs://my-hadoop-namenode/path");

The Flume master reported an error, so I went to see the logs on the Flume node, and it said:

2011-07-20 10:45:32,240 INFO com.cloudera.flume.agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@551470da
2011-07-20 10:45:32,243 INFO com.cloudera.flume.agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-07-20 10:45:35,244 INFO com.cloudera.flume.handlers.hdfs.DFSEventSink: Opening hdfs://my-hadoop-namenode/path
2011-07-20 10:45:36,247 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 0 time(s).
2011-07-20 10:45:37,248 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 1 time(s).
2011-07-20 10:45:38,249 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 2 time(s).
2011-07-20 10:45:39,250 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 3 time(s).
2011-07-20 10:45:40,251 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 4 time(s).
2011-07-20 10:45:41,252 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 5 time(s).
2011-07-20 10:45:42,253 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 6 time(s).
2011-07-20 10:45:43,254 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 7 time(s).
2011-07-20 10:45:44,255 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 8 time(s).
2011-07-20 10:45:45,256 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: my-hadoop-namenode/10.10.9.1:8020. Already tried 9 time(s).
2011-07-20 10:45:45,258 ERROR com.cloudera.flume.core.connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because Call to my-hadoop-namenode/10.10.9.1:8020 failed on connection exception: java.net.ConnectException: Connection refused
java.net.ConnectException: Call to my-hadoop-namenode/10.10.9.1:8020 failed on connection exception: java.net.ConnectException: Connection refused
        at org.apache.hadoop.ipc.Client.wrapException(Client.java:1131)
        at org.apache.hadoop.ipc.Client.call(Client.java:1107)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
        at $Proxy50.getProtocolVersion(Unknown Source)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:398)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:384)
        at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:111)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:213)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:180)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1514)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1548)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1530)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:228)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.openWriter(DFSEventSink.java:68)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.open(DFSEventSink.java:122)
        at com.cloudera.flume.core.EventSinkDecorator.open(EventSinkDecorator.java:75)
        at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:71)
        at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:408)
        at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:425)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:532)
        at org.apache.hadoop.ipc.Client$Connection.access$2300(Client.java:210)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1244)
        at org.apache.hadoop.ipc.Client.call(Client.java:1075)
        ... 19 more
2011-07-20 10:45:45,258 INFO com.cloudera.flume.agent.LogicalNode: Connector flume_node_on_data_node exited with error Call to my-hadoop-namenode/10.10.9.1:8020 failed on connection exception: java.net.ConnectException: Connection refused
2011-07-20 10:45:45,258 ERROR com.cloudera.flume.agent.LogicalNode: Driver on flume_node_on_data_node closed LazyOpenSource | LazyOpenDecorator because of Call to my-hadoop-namenode/10.10.9.1:8020 failed on connection exception: java.net.ConnectException: Connection refused
java.net.ConnectException: Call to my-hadoop-namenode/10.10.9.1:8020 failed on connection exception: java.net.ConnectException: Connection refused
        at org.apache.hadoop.ipc.Client.wrapException(Client.java:1131)
        at org.apache.hadoop.ipc.Client.call(Client.java:1107)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
        at $Proxy50.getProtocolVersion(Unknown Source)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:398)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:384)
        at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:111)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:213)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:180)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1514)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1548)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1530)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:228)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.openWriter(DFSEventSink.java:68)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.open(DFSEventSink.java:122)
        at com.cloudera.flume.core.EventSinkDecorator.open(EventSinkDecorator.java:75)
        at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:71)
        at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:408)
        at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:425)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:532)
        at org.apache.hadoop.ipc.Client$Connection.access$2300(Client.java:210)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1244)
        at org.apache.hadoop.ipc.Client.call(Client.java:1075)
        ... 19 more

We can see that the Flume node is trying to reach my name node at the following address: my-hadoop-namenode/10.10.9.1:8020 (the IP is just my name node’s IP on the network). What’s interesting is the port it defaults to (8020).

From what I’ve read on an old Cloudera blog post, 8020 is supposed to be the port for sending Filesystem metadata operations to the name node through IPC. It makes sense for the Flume sink to try to contact this port, since what it’s trying to do is to create a file in HDFS and write my events in it. What puzzles me is that I’ve searched for this port in all my Hadoop and Flume config files and I couldn’t find it anywhere. Instead, the Hadoop core-site.xml file contained port 54310 in the value for the fs.default.name attribute.

In the end, I found this IBM Karmasphere documentation page that seemed to imply that pretty much every Hadoop distribution was using the port 8020 for the name node, except for the IBM distribution which uses port 54310. (Although it goes only until CDH3b3, and I have CDH3u0, so it probably could theoretically have changed at some point between those releases.)

Now, I’m certain that I installed CDH3 in the way that Cloudera documents, so I don’t know why I ended up with port 54310.

Furthermore, I don’t know why Flume, which is created by Cloudera, defaults to port 8020 when the name node we get with CDH3 defaults to 54310.

EDIT (July 21st 2011) : I think what happened is that I didn’t follow only Cloudera’s instructions, but also Michael Noll’s Multi-Node Cluster instructions. He says to use the 54310 port for the fs.default.name attribute, so that might be where I got that from. It was a few weeks ago and I already can’t remember for sure >.< !

In any case, hard coding the name node’s port in the Flume command allows us to solve the exceptions shown above in the log files.

flume_node_on_data_node: helloWorldSource() | dfs("hdfs://my_hadoop_namenode:54310/path");

I then wanted to run the Java helloWorldSource from a remote node (not one of my data nodes), but the above command didn’t work. The logs reported an End Of File exception:

2011-07-20 16:10:18,621 INFO com.cloudera.flume.agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@cc7f9e
2011-07-20 16:10:18,622 INFO com.cloudera.flume.agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-07-20 16:10:21,623 INFO com.cloudera.flume.handlers.hdfs.DFSEventSink: Opening hdfs://my_hadoop_namenode:54310/test/test6
2011-07-20 16:10:21,932 ERROR com.cloudera.flume.core.connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because Call to my_hadoop_namenode/10.10.9.1:54310 failed on local exception: java.io.EOFException
java.io.IOException: Call to my_hadoop_namenode/10.10.9.1:54310 failed on local exception: java.io.EOFException
        at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
        at org.apache.hadoop.ipc.Client.call(Client.java:743)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
        at $Proxy51.getProtocolVersion(Unknown Source)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
        at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1378)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.openWriter(DFSEventSink.java:68)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.open(DFSEventSink.java:122)
        at com.cloudera.flume.core.EventSinkDecorator.open(EventSinkDecorator.java:75)
        at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:71)
        at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)
Caused by: java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:375)
        at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:501)
        at org.apache.hadoop.ipc.Client$Connection.run(Client.java:446)
2011-07-20 16:10:21,933 INFO com.cloudera.flume.agent.LogicalNode: Connector my_remote_flume_node exited with error Call to my_hadoop_namenode/10.10.9.1:54310 failed on local exception: java.io.EOFException
2011-07-20 16:10:21,933 ERROR com.cloudera.flume.agent.LogicalNode: Driver on my_remote_flume_node closed LazyOpenSource | LazyOpenDecorator because of Call to my_hadoop_namenode/10.10.9.1:54310 failed on local exception: java.io.EOFException
java.io.IOException: Call to my_hadoop_namenode/10.10.9.1:54310 failed on local exception: java.io.EOFException
        at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
        at org.apache.hadoop.ipc.Client.call(Client.java:743)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
        at $Proxy51.getProtocolVersion(Unknown Source)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
        at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:207)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:170)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1378)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.openWriter(DFSEventSink.java:68)
        at com.cloudera.flume.handlers.hdfs.DFSEventSink.open(DFSEventSink.java:122)
        at com.cloudera.flume.core.EventSinkDecorator.open(EventSinkDecorator.java:75)
        at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:71)
        at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)
Caused by: java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:375)
        at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:501)
        at org.apache.hadoop.ipc.Client$Connection.run(Client.java:446)

As I understand it, if you want to run the Java code on a remote node, you will need to run two Flume nodes, one on the remote node (the agent), and one on the Hadoop data node (the collector). There are probably other (better) ways of setting up the Flume nodes, but this is one way that worked for me:

remote_flume_node: helloWorldSource() | agentSink("flume_node_on_data_node",35853);
flume_node_on_data_node: collectorSource(35853) | dfs("hdfs://my_hadoop_namenode:54310/path");

The 35853 port comes from the Flume User Guide but I don’t know how it was chosen… I tried setting other arbitrary ports and the collector was not able to open them while the agent just kept sending like there’s no tomorrow. If anyone knows how to pick valid ports for the agents and collectors, I’d love to know! (This seems crucial if you want to run many logical flume nodes on one physical node)

I hope this will be useful to other people! Please leave comments if this helped, didn’t help, if you have questions or if you have more info :) !

Hello world!

This is my new blog. It will be dedicated to the stuff I work on.

I have used the web for years to find the answers to my questions. More and more, however, I find myself in uncharted territory. This is not to say that I do crazy stuff no one has ever done before, but I do find solutions to problems that no one else seems to have documented anywhere.

Thus, this is my attempt to give a little bit back to the web that has provided me with so many answers so far.

I hope you can find something useful here :) !