On anything that got to do with software development

Month: May 2018

Cannot cast exception while reading data from Hive in Spark


In this post I am going to talk about an exception I got when I was trying to read data from Hive using Spark and how I managed to debug the issue and resolved it. I will also explain how one can reproduce the issue, by doing so, one can also avoid reproducing it. The exception was:

java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.hive.serde2.io.DateWritable

Spark code

Here is the snippet of the code that I had in spark:

From the code snippet above, all I am doing is a simple select from a table called geonames_data which is in a database called tendai_test and I just want to read only 5 records. The exception is thrown when trying to retrieve the data using the show function.


Now, my first instinct was to check the schema of the table. Using the Dataframe’s printSchema function, I can see the fields and their datatypes as shown below:

From the schema above I can see that all the fields and their datatypes and all is what I expect. However, from the query I am using to retrieve the data, there is no place I am doing a cast. Therefore, there got to be another place I need to check to see if there is a difference.  The next place I checked was to read the schema from ORC as shown below:

From the schema above, I noticed that there is a difference in datatypes for a field called modificationdate, from ORC its string whereas from Hive its a date.


Quick solution is to do a backup of the table in Hive as follows:

create table tendai_test.geonames_data_backup as select * from tendai_test.geonames_data;

After backing up the data, drop the original table as follows:

drop table tendai_test.geonames_data purge;

Once the table is dropped, create the table using the original script and using the geonames_data table, the script is as follows:

create table tendai_test.geonames_data(
geonameid int,
name string,
asciiname string,
alternatenames string,
latitude int,
longitude int,
featureclass string,
featurecode string,
countrycode string,
cc2 string,
admin1code int,
admin2code int,
admin3code int,
admin4code int,
population int,
elevation string,
dem int,
timezone string,
modificationdate date)

Once that is done, populate the table using the backed up data from geonames_data_backup as follows:

insert into tendai_test.geonames_data select * from tendai_test.geonames_data_backup;

Back to the Spark code

If I run the Spark code again, I will no longer have the exception as shown below:

Just did a minor modification to the code by selecting a few fields.

Reproducing the issue

Using the backup table from the above, we can recreate the issue as follows:


The lesson here is that, when writing data in Spark, make sure that the datatypes are not changed. Or when doing a cast, make sure that the cast is of the right datatype. Check first before doing a cast to make sure that the right thing is being done.

Doing this without checking might slow down production especially if working in a big team. Say someone is reading data from geonames_data, and the expected datatype of modificationdate is date, then all of a sudden there is now a cast exception. Trying to get to the bottom of the issue and resolving it, will slow down progress.


This is what I experienced and I just wanted to share. In future posts, I will talk about Hive since there are some concepts in this post that I didn’t talk about, for instance ORC and Dataframes.

Apache Spark 2


In the article, Apache Spark 1, I looked at a high overview of what Apache Spark is and its architecture. I touched on the fact that at the core of Apache Spark there is a very advanced Direct Acyclic Graph (DAG) data processing engine. In this article I will look into DAG and how it works in Spark.

What is Direct Acyclic Graph (DAG) ?

Spark is efficient due to its advanced Directed Acyclic Graph (DAG) data processing engine. To understand how DAG works, we have to break it down and define each word.

Graph, what does it mean? Its a diagram representing a system of connections or interrelations among two or more things by a number of distinctive dots, lines, bars and so forth. In our case, which is DAG, the graph is a representation of connected nodes and they are connected by what are know as edges.

Acyclic, what does it mean?  Its an adjective that describes a graph in which there is no cycle or closed path.

Directed, what does it mean? In the context of DAG, its a direction an edge has to take and it is represented by an arrow.

From the diagram above, we have a graph with a collection of nodes denoted by the letters A to F and they are connected by edges which have specific directions, hence the arrows. If we are to follow the movement, say from A, we will never go back to A, hence they are acyclic. There is no circular movement and because of that, it enables dependency tracking, for instance, B depends on A and F on E and so forth. If we think of it as a data processing pipeline, A and G will be the source of the data and it will go through several transformation steps until it gets to F which will be the final result.

Now in the context of Spark, our nodes are RDDs and the arrows are Transformations. Below is an example of how DAG works in Spark:

This is a simple word count that will be done for a ReadMe file. Visually the process will look as follows:

The textFile is our first RDD whose element is a line and it will be transformed using flatMap to another RDD whose element is a word and then map  will transform the word to a key value pair RDD. The reduceByKey will then do the summation of the counts of each word. Spark comes with its own DAG visualisation tool. From that tool the DAG diagram will look as follows:

Now, if we look at the first DAG diagram, whereby we were using letters as nodes, we established that letter A and G will represent the source of our data. The final result will then be letter F, after it has gone through some transformation. This is the same with the above example, the ReadMe file is the source and as it moves from Stage 0 to Stage 1, the data will go through some transformation. But then one would ask, why the split between Stage 0 and 1? Why is reduceByKey in its own stage?

As I mentioned earlier, one of the advantages of DAG is the tracking of dependencies. The textFile has to be flatMapped -> mapped -> reducedByKey.  Each line from the ReadMe file will be transformed to words. Now each word does not have any dependency on the other words. However, when we now want to find the number of occurrences of each word, there is going to be need of movement of words. In Spark, this is known as shuffling and if there is need for shuffling, Spark sets that as a boundary between stages. If a task will result in shuffling, it will be placed in its own stage, hence, reduceByKey is in Stage 1. Once that is done, tasks in each stage are then bundled together and are sent to the executors.


In this article, I managed to explain what DAG is in Spark context and also gave an example on how it works in Spark. This is just scratching the surface since more is involved when running a Spark application. For instance, when a Spark application is submitted to Apache Spark, one needs to understand how to set Spark configurations for Spark to work efficiently. Therefore, questions like, how much driver memory should I use, how many executors or number of cores per each executor do I need, will need to be answered.

When developing a Spark application, one also need to understand what transformation is and what action is in the context of Spark. Understanding these terms will help in developing an application that will be executed efficiently.

Will try to cover these concepts in the next articles on Apache Spark.

Execution plan 



© 2024 Tendai

Theme by Tendai BepeteUp ↑