Montag, 4. Juli 2016

Deal with corrupted messages in Apache Kafka

Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at Log.read() at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like:

ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy

Kafka-Tools

Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also zkCli.sh can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):

bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test

Prior to Kafka 0.9 the only possibility to get this informations was to use zkCli.sh (or similar tools) to find the consumer group. Since the debug with zkCli is a bit frustrating, I personally use kafka-manager from Yahoo [3]. 
Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:

ls /consumer/test/offsets
[1]
get /consumer/test/offsets/1
[15]

With Kafka that command would look like:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181

Group     Topic   Pid   Offset   logSize   Lag   Owner
console-1 test    1     15       337       326   none


After the offset was found, this offset can be incremented to force the consumer to read the next available message. Before doing this, Kafka has to be shutdown. 

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest 16 test

After restart, Kafka should be able read the next message, in the case this message isn’t corrupted, too. And yes, the corrupted message is lost and can’t be restored, so it's always a good idea to implement a CRC check before any message gets to Kafka.

A code based approach is also available [4]. For that a subclass of the ConsumerIterator has to be created, which will catch the message exception, replace it with a dummy message and proceed with the next message. Of course the corrupted message is lost in that case, too.

Montag, 27. Juni 2016

Encryption in HDFS

Encryption of data was and is the hottest topic in terms of data protection and prevention against theft. Hadoop HDFS supports full transparent encryption in transit and at rest [1], based on Kerberos implementations [2], often used within multiple trusted Kerberos domains.

Technology

Hadoop KMS provides a REST-API, which has built-in SPNEGO and HTTPS support, comes mostly bundled with a pre-configured Apache Tomcat within your preferred Hadoop distribution. 
To have encryption transparent for the user and the system, each encrypted zone is associated with a SEZK (single encryption zone key), created when the zone is defined as an encryption zone by interaction between NN and KMS. Each file within that zone will have its own DEK (Data Encryption Key). This behavior is fully transparent, since the NN directly asks the KMS for a new EDEK (encrypted data encryption key) encrypted with the zones key and adds them to the file’s metadata when a new file is created.

When a client wants to read a file in an encrypted zone, the NN provides the EDEK together with a zone key version and the client asks the KMS to decrypt the EDEK. If the client has permissions to read that zone (POSIX), the client will use the provided DEK to read the file. Seen from a DFS node perspective, that datastream is encrypted and the nodes only see an encrypted data stream. 

Setup and Use

I use here Cloudera’s CDH as example, but the same would work with other distributions and for sure with the official Apache Hadoop distribution. Enabling KMS in CDH (5.3.x and up) it's pretty easy, and doesn’t need to be explained here since Cloudera has great articles online about that process [3]. Important to know is only that KMS doesn’t work without a working Kerberos implementation. Additionally, there are other configuration parameters which need to be known, especially in a multi-domain Kerberos environment.
First, KMS uses the same rule based mechanism as HDFS uses when a trusted kerberos environment is used. That means the same filtering rules as existent in hdfs-site.xml need to be added to kms-site.xml to get the encryption for all trusted domains working. This has to be done per:

<property>
 <name>hadoop.kms.authentication.kerberos.name.rules</name>
  <value>RULE:[1:$1@$0](.*@\QTRUSTED.DOMAIN\E$)s/@\QTRUSTED.DOMAIN\E$//
RULE:[2:$1@$0](.*@\QTRUSTED.DOMAIN\E$)s/@\QTRUSTED.DOMAIN\E$//
RULE:[1:$1@$0](.*@\QMAIN.DOMAIN\E$)s/@\QMAIN.DOMAIN\E$//
RULE:[2:$1@$0](.*@\QMAIN.DOMAIN\E$)s/@\QMAIN.DOMAIN\E$//
DEFAULT</value>
</property>


per kms-site.xml. The terms trusted.domain / main.domain are placeholders, describing the original and the trusted kerberos domain. The use from an administrative standpoint is straightforward:
hadoop key create KEYNAME #(one time key creation)
hadoop fs -mkdir /enc_zones/data
hdfs crypto -createZone -keyName KEYNAME -path /enc_zones/data
hdfs crypto -listZones


First I create a key, then I create the directory I want to encrypt in HDFS and encrypt this with the key I created first. 
This directory is now only accessible by me or users I give access per HDFS POSIX permissions. Others aren’t able to change or read files. To give superusers the possibility to create backups without de- and encrypt, a virtual path prefix for distCp (/.reserved/raw) [4] is available. This prefix allows the block-wise copy of encrypted files, for backup and DR reasons.

The use of distCp for encrypted zones can cause some mishaps. Highly recommended is to have identical encrypted zones on both sides to avoid problems later. A potential distCp command for encrypted zones could look like:

hadoop distcp -px hdfs://source-cluster-namenode:8020/.reserved/raw/enc_zones/data hdfs://target-cluster-namenode:8020/.reserved/raw/enc_zones/data

Samstag, 4. Juni 2016

Open Source based Hyper-Converged Infrastructures and Hadoop

According to a report from Simplivity [1] Hyper-Converged Infrastructures are used by more than 50% of the interviewed businesses, tendentious increasing. But what does this mean for BigData solutions, and Hadoop especially? What tools and technologies can be used, what are the limitations and the gains from such a solution?

To build a production ready and reliable private cloud to support Hadoop clusters as well as on-demand and static I have made great experience with OpenStack, Saltstack and the Sahara plugin for Openstack.
Openstack supports Hadoop-on-demand per Sahara, it's also convenient to use VM's and install a Hadoop Distribution within, especially for static clusters with special setups. The Openstack project provides ready to go images per [2], as example for Vanilla 2.7.1 based Hadoop installations. As an additional benefit, Openstack supports Docker [3], which adds an additional layer of flexibility for additional services, like Kafka [4] or SolR [5].

Costs and Investment
The costs of such an Infrastructure can vary, depending on the hardware and future strategy. Separate compute and storage nodes have been proven in the past, and should be used in future, too. The benefits outweigh the limitations, mostly end up in having move bare metal servers than in a high packed (compute and storage in one server) environment. Additionally, a more stretched environment
helps to balance peaks and high usage better than packed servers. A typical setup would have 2 controller nodes (for HA reasons), a decent count on compute nodes (high memory and CPU count) and several storage nodes (1 CPU, 8 or 16GB RAM and plenty JBOD (just a bunch of disks)). Those storage nodes should have 2 LVM’s (or raids, if that feels better) to avoid later conflicts with production and development / staging / QA buildouts.

Technology
Hadoop itself has some limitations, especially in Hyper-Converged Infrastructures, given by the demand on data locality for batch processes (MapReduce). In a typical cloud environment, like Sahara is providing in Openstack, the storage area is virtualized, and all data is transferred over the network stack. This can be avoided by using VM images for a persistent Hadoop cluster, as a production one mostly is. The data storage (HDFS) will then be provided within the VM and can be extended by mounting additional volumes to the VM (partitions for the data nodes, for example). In both implementations, Cloud based by Sahara and VM, the use of HDFS caching [6] is recommended. This will dramatically speed up the platform for analytical workloads by using columnar based storage formats like Parquet or Kudu [7], together with Hive on Spark [8]. To identify bottlenecks analyzer like Dr. Elephant [9] are very useful and recommended.

Hadoop on demand provides much more flexibility as a static cluster has, especially in terms of load peaks, dynamical resource allocation and cost efficiency. But there are some points to consider. The first and most important one is the separation of block storage and computing. Hadoop itself works with different other distributed filesystems, like ceph [10], but those often rely on Hadoop 1 (MRv1) and Yarn and MRv2 aren’t supported (yet).
The best solution here is to use the standard HDFS layer over cinder [11], which provides good performance with reliability and decent IOpS. The second, and also important one is the network layer. Every compute and storage node should have at least bonded 1GB uplinks, 10G are better (but more expensive). The network needs to be separated into front- and backend. The front-end link provides accessibility to the services the cluster provides to its users, and the back-end provides inter-cluster-communication only. As a third point the use of in-memory filesystems like Alluxio [12] (former Tachyon) may be considered, especially for research clusters, like Genome calculation or NRT applications with high ingestion rates of small data points, like IoT devices typically do.
With these points in mind, streaming based applications getting the most out of this approach, given by the high flexibility and the availability to deal with large load peaks by adding computing resources dynamically. 

Conclusion
Using Hyper-Converged Infrastructures in the world of BigData tools is trending now and proves the success of the private cloud idea. Large companies like LinkedIN, Google, Facebook are on this road since years, and the success outweighs the implementation and maintenance considerations.

List of tools used in this article
Openstack:
Sahara:

Saltstack - Openstack:

Links and References:

Dienstag, 10. Mai 2016

SolR, NiFi, Twitter and CDH 5.7

Since the most interesting Apache NiFi parts are coming from ASF [1] or Hortonworks [2], I thought to use CDH 5.7 and do the same, just to be curious. Here's now my 30 minutes playground, currently running in Googles Compute.

On one of my playground nodes I installed Apache NiFi per
mkdir /software && cd /software && wget http://mirror.23media.de/apache/nifi/0.6.1/nifi-0.6.1-bin.tar.gz && tar xvfz nifi-0.6.1-bin.tar.gz

Then I've set only nifi.sensitive.props.key property in conf/nifi.properties to an easy to remember secret. The next bash /software/nifi-0.6.1/bin/nifi.sh install installs Apache NiFi as an service. After log in into Apache NiFi's WebUI, download and add the template [3] to Apache NiFi, move the template icon to the drawer, open it and edit the twitter credentials to fit your developer account.

To use an  schema-less SolR index (or Cloudera Search in CDH) I copied some example files over into a local directory:
cp -r /opt/cloudera/parcels/CDH/share/doc/solr-doc-4.10.3+cdh5.7.0+389/example/example-schemaless/solr/collection1/conf/* $HOME/solr_configs/conf/

And added to solrconfig.xml into the <updateRequestProcessorChain name="add-unknown-fields-to-the-schema"> declaration below <updateRequestProcessorChain name="add-unknown-fields-to-the-schema">:
<str>EEE MMM d HH:mm:ss Z yyyy</str>

So it looks like:
<processor>
<arr name="format">
<str>EEE MMM d HH:mm:ss Z yyyy</str>


Since the new Twitter API HTML format the client source, I added a HTML strip processor into the same declaration:

</processor>
  <processor class="solr.HTMLStripFieldUpdateProcessorFactory">
  <str name="fieldName">source_s</str>
</processor>

All configs are available per Gist [4,5].

To get the configs running, initialize SolR:

solrctl --zk ZK_HOST:2181/solr instancedir --create twitter $HOME/solr_configs
solrctl --zk ZK_HOST:2181/solr collection --create twitter -s 2 -c twitter -r 2

Setup Banana for SolR is pretty easy:
cd /software && wget https://github.com/lucidworks/banana/archive/release.zip && unzip release.zip && mv banana-release banana && cp -r banana /opt/cloudera/parcels/CDH/lib/solr/webapps/ on one of the solr hosts and check if it's running per http://solr-node:8983/banana/src/index.html. To move fast forward, I have a dashboard available on gist [5], too.

Screenshot Dashboard:


Apache NiFi flow:


Conclusion
This demo shows that's pretty easy today by using available tools to setup more or less complex data flows within a few hours. Apache NiFi is pretty stable, has a lot of sinks available and runs now 2 weeks in Google Compute, captured over 200 mio tweets and stored them in SolR as well as in HDFS. It's interesting to play around with the data in realtime, interactive driven by Banana. 




Dienstag, 5. Januar 2016

Apache Tez on CDH 5.4.x

Since Cloudera doesn't support Tez in their Distribution right now (but it'll come, I'm pretty confident), we experimented with Apache Tez and CDH 5.4 a bit.
To use Tez with CDH isn't so hard - and it works quite well.  And our ETL and Hive jobs finished around 30 - 50% faster.

Anyway, here the blueprint. We use CentOS 6.7 with Epel Repo.

1. Install maven 3.2.5 
wget http://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.tar.gz
tar xvfz apache-maven-3.2.5-bin.tar.gz -C /usr/local/
cd /usr/local/
ln -s apache-maven-3.2.5 maven

=> Compiling Tez with protobuf worked only with 3.2.5 in my case

1.1 Install 8_u40 JDK
mkdir development && cd development (thats my dev-root)

wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/8u40-b26/jdk-8u40-linux-x64.tar.gz"
tar xvfz jdk-8u40-linux-x64.tar.gz
export JAVA_HOME=/home/alo.alt/development/jdk1.8.0_40
export JRE_HOME=/home/alo.alt/development/jdk1.8.0_40/jre
export PATH=$PATH:/home/alo.alt/development/jdk1.8.0_40:/home/alo.alt/development/jdk1.8.0_40/jre

2. Create a maven profile.d file
vi /etc/profile.d/maven.sh
export M2_HOME=/usr/local/maven
export PATH=${M2_HOME}/bin:${PATH}

3. Get Tez
git clone https://github.com/apache/tez.git
git checkout tags/release-0.7.0
git checkout -b tristan

modify pom.xml to use hadoop-2.6.0-cdh.5.4.2

<profile>
   <id>cdh5.4.0</id>
   <activation>
   <activeByDefault>false</activeByDefault>
   </activation>
   <properties>
     <hadoop.version>2.6.0-cdh5.4.0</hadoop.version>
   </properties>
   <pluginRepositories>
     <pluginRepository>
     <id>cloudera</id>
     <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
     </pluginRepository>
   </pluginRepositories>
   <repositories>
     <repository>
       <id>cloudera</id>
       <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
     </repository>
   </repositories>
</profile>

And apply the patch from https://gist.github.com/killerwhile/23225004a78949d4c849#file-gistfile1-diff

4. Install protobuf
sudo yum -y install gcc-c++ openssl-devel glibc
wget https://protobuf.googlecode.com/files/protobuf-2.5.0.tar.bz2
tar xfvj protobuf-2.5.0.tar.bz2
cd protobuf-2.5.0/
./configure && make && make check
make install && ldconfig && protoc --version

or use the precompiled RPMS:
ftp://ftp.pbone.net/mirror/ftp5.gwdg.de/pub/opensuse/repositories/home:/kalyaka/CentOS_CentOS-6/x86_64/protobuf-2.5.0-16.1.x86_64.rpm 
ftp://ftp.pbone.net/mirror/ftp5.gwdg.de/pub/opensuse/repositories/home:/kalyaka/CentOS_CentOS-6/x86_64/protobuf-compiler-2.5.0-16.1.x86_64.rpm

5. Build Tez against CDH 5.4.2
mvn -Pcdh5.4.2 clean package -Dtar -DskipTests=true -Dmaven.javadoc.skip=true

6. Install Tez
hadoop dfs -mkdir /apps/tez && hadoop dfs -copyFromLocal tez/tez-dist/target/tez-0.7.0.tar.gz /apps/tez/tez-0.7.0.tar.gz

sudo mkdir -P /apps/tez && tar xvfz tez/tez-dist/target/tez-0.7.0.tar.gz -C /apps/tez/

6.1 create a tez-site.xml in /apps/tez/conf/
<configuration>
  <property>
    <name>tez.lib.uris</name>
    <value>${fs.default.name}/apps/tez/tez-0.7.0.tar.gz</value>
  </property>
</configuration>

7. Run Tez with Yarn
export TEZ_HOME=/apps/tez
export TEZ_CONF_DIR=${TEZ_HOME}/conf
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${TEZ_CONF_DIR}:$(find ${TEZ_HOME} -name "*.jar" | paste -sd ":")"

hive> set hive.execution.engine=tez;
hive> SELECT s07.description, s07.salary, s08.salary, s08.salary - s07.salary FROM sample_07 s07 JOIN sample_08 s08 ON ( s07.code = s08.code) WHERE s07.salary < s08.salary ORDER BY s08.salary-s07.salary DESC LIMIT 1000;

beeline --hiveconf tez.task.launch.env="LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$YOUR_HADOOP_COMMON_HOME/lib/native" \ 
--hiveconf tez.am.launch.env="LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$YOUR_HADOOP_COMMON_HOME/lib/native" '
Check if you have the lib*.so available in the native folder (or point to the folder which contains the .so files).

Sources:
https://gist.github.com/killerwhile/23225004a78949d4c849#file-gistfile1-diff
http://tez.apache.org/install.html