Saturday, May 28, 2022

Theory Of Bucketing

 Theory of Hive Bucketing 

Bucketing in Hive is a data organizing technique. Clustering is a technique to split the data into more manageable files by specifying the number of buckets to create at the time of creating a Hive table . The value of the bucketing column will be hashed by a user-defined number into buckets.It is similar to partitioning in Hive with an added functionality that it divides large datasets into more manageable parts known as buckets. So, we can use bucketing in Hive when the implementation of partitioning becomes difficult.

with the help of bucketing in hive , you can decompose a table data set into smaller parts, making them easier
to handle . Bucketing allows you to group similar data types and write them to one single file, which enhance 
performance  while joining tables or reading data. This is  big reason why we use bucketing with partitioning .
The concept of bucketing is based on the hashing technique
modules of current column value and the number of required buckets is calculated (let say, F(x) % 3)
Now, based on the resulted value, the data is stored into the corresponding bucket
Enable the bucketing by using the following command: -
hive> set hive.enforce.bucketing = true;  
Create a bucketing table by using the following command: -
hive> create table emp_bucket(Id int, Name string , Salary float)    
clustered by (Id) into 3 buckets  
row format delimited    
fields terminated by ',' ;
Create a dummy table to store the data.
hive> create table emp_demo (Id int, Name string , Salary float)    
row format delimited    
fields terminated by ',' ; 

Now, load the data into the table.
hive> load data local inpath '/home/codegyani/hive/emp_details' into table emp_demo;  
Now, insert the data of dummy table into the bucketed table.
hive> insert overwrite table emp_bucket select * from emp_demo;    

Friday, May 27, 2022

Theory of Hive Partition

Partitions in Hive

Partitioning is the optimization technique in Hive which improves the performance significantly. It is a way of dividing a table into related parts based on the values of partitioned columns. Using partition, it is easy to query a portion of the data. Tables or partitions are sub-divided into buckets, to provide extra structure to the data so that  data may be used for more efficient querying. Bucketing works based on the value of hash function of some column of a table.


hive> create table employee (id int, name string, age int,  department string)   

partitioned by (department string)  

row format delimited  

fields terminated by ','; 

The partitioning in Hive can be executed in two ways -

1) Static partitioning : In static , it is required to pass the values of partitioned columns manually while loading the data into the table

hive> load data local inpath '/home/codegyani/hive/employee_details' into table employee

partition((department = "hr");  

2) Dynamic partitioning : In dynamic partitioning, the values of partitioned columns exist within the table. So, it is not required to pass the values of partitioned columns manually

we can’t perform alter on the Dynamic partition.

Enable the dynamic partition by using the following commands: 

hive> set hive.exec.dynamic.partition=true;    

hive> set hive.exec.dynamic.partition.mode=nonstrict;  


Create a dummy table to store the data


create table employee_dummy(id int, name string, age int,  department string)   

row format delimited  

fields terminated by ','; 


Now, load the data into the table.

hive> load data local inpath '/home/codegyani/hive/employee_details' into table employee_dummy;


Now, insert the data of dummy table into the partition table  

insert into employee

partition(department)  

select id, name, age,department  

from employee_dummy;

---------------- adding partition to  existing table ---------------------------

ALTER TABLE <Table_Name> ADD 

[IF NOT EXISTS] PARTITION 

<partition_name> LOCATION 'hdfs_path_of_directory'

----------------Rename Partition ---------------------------------------

Renaming a Partition

ALTER TABLE employee PARTITION (department='software-engineer') RENAME TO PARTITION (department='developer');

Manually Renaming Partitions on HDFS

hdfs dfs -mv /user/hive/warehouse/employee/department=software-engineer   /user/hive/warehouse/employee/department=developer

----------------Dropping a Partition ---------------------------------------

Dropping a Partition

>ALTER TABLE employee DROP IF EXISTS PARTITION (department='developer');

Manually Dropping Partitions on HDFS

hdfs dfs -rm -R /user/hive/warehouse/employee/department=developer

Show All Partitions on Hive Table

SHOW PARTITIONS employee;

How to Filter Partitions

> SHOW PARTITIONS employee PARTITION(department='developer');

Wednesday, May 25, 2022

Spark Deploy Modes

Spark application can be submitted in two different ways –  client  mode and cluster mode

Client  Deploy Mode in Spark:

In client mode, the Spark driver component of the spark application will run on the machine from where the job submitted.client mode is majorly used for interactive and debugging purposes.Note that in client mode only the driver runs locally and all tasks run on cluster worker nodes.

The default deployment mode is client mode.

In client mode, a user session running spark-submit terminates ,  application also terminates with status fail.

Client mode is not used for Production jobs .This is used for testing purposes.

Driver logs are accessible from the local machine itself.






spark-submit --deploy-mode client --driver-memory xxxx  ......


Cluster Deploy Mode in Spark:


In Cluster Deploy mode, the driver program would be launched on any one of the spark cluster nodes (on any of the available nodes in the cluster). Cluster deployment is mostly used for large data sets where the job takes few mins/hrs to complete.In the cluster mode, Spark driver get started in any of the worker machines So, the user who is submitting the application can submit the application and the user can go away after initiating the application or can continue with some other work. So, it works with the concept of Fire and Forgets.

In any case, if the job is going to run for a long period time and we don’t want to wait for the result then we can submit the job using cluster mode so once the job submitted client doesn’t need to be online.

spark driver runs on one of the worker node within the cluster, which reduces the data movement overhead between submitting machine and the cluster.

For the Cloudera cluster, you should use yarn commands to access driver logs.

It highly reduces the chance of job failure



spark-submit --deploy-mode cluster --driver-memory xxxx  ........




Spark Terminalogy

 Some Terminalogy use in spark


 Resilient Distributed Dataset – RDD: 

 RDD is an acronym for Resilient Distributed Dataset. It is the fundamental unit of data in Spark. Basically, it is a distributed collection of elements across cluster nodes. Also performs parallel operations. Moreover, Spark RDDs are immutable in nature. Although, it can generate new RDD by transforming existing Spark RDD

Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark. They are immutable Distributed collections of objects of any type. As the name suggests is a Resilient (Fault-tolerant) records of data that resides on multiple nodes.


DataFrame :  It works only on structured and semi-structured data. Unlike an RDD, data organized into named columns. For example a table in a relational database. It is an immutable distributed collection of data. DataFrame in Spark allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction. DataFrames allow the Spark to manage schema.


DataSet : it is an extension of dataframe API, which provides the functionality of type-safe, object-oriented programming interface of the RDD API  It also efficiently processes structured and unstructured data


There are 3 ways of creating an RDD:


1)Parallelizing an existing collection of data

2)Referencing to the external data file stored

3)Creating RDD from an already existing RDD



Spark Session:

SPARK 2.0.0 onwards, SparkSession provides a single point of entry to interact with underlying Spark functionality and

allows programming Spark with DataFrame and Dataset APIs. All the functionality available with sparkContext are also available in sparkSession.

In order to use APIs of SQL, HIVE, and Streaming, no need to create separate contexts as sparkSession includes all the APIs.


SparkContext:

SparkContext is the entry gate of Apache Spark functionality. The most important step of any Spark driver application is to generate SparkContext. It allows your Spark Application to access Spark Cluster with the help of Resource Manager (YARN/Mesos). To create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext.

sparkConf is required to create the spark context object, which stores configuration parameter like appName (to identify your spark driver), application, number of core and memory size of executor running on worker node.

Once the SparkSession is instantiated, we can configure Spark’s run-time config properties 



--num-executors: Number of executors is the number of distinct yarn containers (think processes/JVMs) that will execute your application.

--executor-cores: Number of executor-cores is the number of threads you get inside each executor (container)


--executor-memory: An executor is a process that is launched for a Spark application on a worker node. Each executor memory is the sum of yarn overhead memory and JVM Heap memory. JVM Heap memory comprises of: RDD Cache Memory. Shuffle Memory.

--driver-memory :     The --driver-memory flag controls the amount of memory to allocate for a driver, which is 1GB by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application

setting it in the properties file (default is $SPARK_HOME/conf/spark-defaults.conf).



---job submmit----


--class org.apache.spark.examples.SparkPi --master yarn

--deploy-mode client --driver-memory 4g --num-executors 2 --executor-memory 2g

--executor-cores 2 /opt/apps/spark-1.6.0-bin-hadoop2.6/lib/spark-examples*.jar 10

Tuesday, May 17, 2022

Zookeeper

                                                        Introduction to Zookeeper

 Apache Zookeeper is a centralized service and a Hadoop Ecosystem component for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

ZooKeeper is a distributed co-ordination service to manage large set of hosts. Co-ordinating and 



managing a service in a distributed environment is a complicated process. ZooKeeper solves this issue with its simple architecture and API.

Zookeeper is a unit where the information regarding configuration, naming and group services are stored which are responsible for synchronization of Hadoop tasks.


The ZooKeeper framework was originally built at “Yahoo!” for accessing their applications in an easy and robust manner. Later, Apache ZooKeeper became a standard for organized service used by Hadoop, HBase, and other distributed frameworks.

zookeeper 3 important services are used widely in hadoop.


zookeeper failover controller: use in name node ha.


zookeeper statestore : use in resource manager ha.


zookeeper dynamic service discovery : use in hive server2  ha.

Sunday, May 15, 2022

impala

 Impala is a MPP (Massive Parallel Processing) SQL query engine for processing huge volumes of data that is stored in Hadoop cluster. 

A tool which we use to overcome the slowness of Hive Queries is what we call Impala. This separate tool was provided by Cloudera distribution

It is an open source software which is written in C++ and Java. It provides high performance and low latency compared to other SQL engines for Hadoop.

 Impala uses MPP (massively parallel processing) to run lightning fast queries against HDFS, HBase, etc.


It offers high-performance, low-latency SQL queries. Impala is the best option while we are dealing with medium sized datasets and we expect the real-time response from our queries. However, make sure Impala is available only in Hadoop distribution.

It can read almost all the file formats used by Hadoop. Like Parquet, Avro, RCFile

Impala is not based on MapReduce algorithms, unlike Apache Hive.

Hence, Impala is faster than Apache Hive, since it reduces the latency of utilizing MapReduce


There are 3 Daemons in Impala . they are as follow

Impala statestore : Impala statestore  is install on one host of the cluster. statestore  checks on the health of Impala daemons on all the DataNodes .

 We can say Statestore daemon is a name service that monitors the availability of Impala services across the cluster. 

Also, handles situations such as nodes becoming unavailable or becoming available again. Impala statestore keeps track of which ImpalaD’s are up and running, and relays this information to all the ImpalaD’s in the cluster. Hence, they are aware of this information when distributing tasks to other ImpalaD’s.


   


Impala Catelog server :   Impala Catelog server is install on 1 host of the cluster .via the state stored it distributes metadata to Impala daemons.

It is physically represented by a daemon process named catalogd . You only need such a process on one host in a cluster.



Impala Deamon: 

this daemon will be one per node. Moreover, on every data node, it will be installed. They form the core of the Impala execution engine and are the ones reading data from HDFS/HBase and aggregating/processing it.

in order to store the mapping between table and files this daemon will use Hive metastore. 

 Also, uses HDFS NN to get the mapping between files and blocks. Therefore, to get/process the data impala uses hive metastore and Name Node.

we can say all ImpalaD’s are equivalent. This daemon accepts queries from several tools such as  the impala-shell command, Hue, JDBC or ODBC.


There are 3 major components of ImpalaD such as



Query Planner :  Query Planner is responsible for parsing out the query  this planning occurs in 2 parts.

 1)  Since all the data in the cluster resided on just one node, a single node plan is made, at first

 

 2) Afterwards, on the basis of the location of various data sources in the cluster, this single node plan is converted to a distributed plan (thereby leveraging data locality).



Coordinator  :   Query Coordinator is responsible for coordinating the execution of the entire query. To read and process data, it sends requests to various executors. Afterward, it receives the data back from these executors and streams it back to the client via JDBC/ODBC

 

Executor : Executor is responsible for  aggregations of data .Especially, the data which is read locally or if not available locally could be streamed from executors of other Impala daemons

Introduction To Spark

 spark is fast general-purpose distributed data processing engine compatible with Hadoop data. On top of the Spark core data processing engine, there are libraries for SQL, machine learning, graph computation, and stream processing, which can be used together in an application.

It utilizes in-memory caching, and optimized query execution for fast analytic queries against data of any size.

that why it is widely used for big data workloads.


Spark is written in Scala but provides rich APIs for  Scala, Java,  Python, and R. Apache Spark is a tool for Running Spark Applications. Spark is 100 times faster than Bigdata Hadoop and 10 times faster than accessing data from disk. It can be integrated with Hadoop and can process existing Hadoop HDFS data.

 Resilient Distributed Dataset – RDD: 

 RDD is an acronym for Resilient Distributed Dataset. It is the fundamental unit of data in Spark. Basically, it is a distributed collection of elements across cluster nodes. Also performs parallel operations. Moreover, Spark RDDs are immutable in nature. Although, it can generate new RDD by transforming existing Spark RDD


 Lazy Evaluation: 

Spark Lazy Evaluation means the data inside RDDs are not evaluated on the go. Basically, only after an action triggers all the  computation is performed. Therefore, it limits how much work it has to do.