Saturday, May 28, 2022

Theory Of Bucketing

 Theory of Hive Bucketing 

Bucketing in Hive is a data organizing technique. Clustering is a technique to split the data into more manageable files by specifying the number of buckets to create at the time of creating a Hive table . The value of the bucketing column will be hashed by a user-defined number into buckets.It is similar to partitioning in Hive with an added functionality that it divides large datasets into more manageable parts known as buckets. So, we can use bucketing in Hive when the implementation of partitioning becomes difficult.

with the help of bucketing in hive , you can decompose a table data set into smaller parts, making them easier
to handle . Bucketing allows you to group similar data types and write them to one single file, which enhance 
performance  while joining tables or reading data. This is  big reason why we use bucketing with partitioning .
The concept of bucketing is based on the hashing technique
modules of current column value and the number of required buckets is calculated (let say, F(x) % 3)
Now, based on the resulted value, the data is stored into the corresponding bucket
Enable the bucketing by using the following command: -
hive> set hive.enforce.bucketing = true;  
Create a bucketing table by using the following command: -
hive> create table emp_bucket(Id int, Name string , Salary float)    
clustered by (Id) into 3 buckets  
row format delimited    
fields terminated by ',' ;
Create a dummy table to store the data.
hive> create table emp_demo (Id int, Name string , Salary float)    
row format delimited    
fields terminated by ',' ; 

Now, load the data into the table.
hive> load data local inpath '/home/codegyani/hive/emp_details' into table emp_demo;  
Now, insert the data of dummy table into the bucketed table.
hive> insert overwrite table emp_bucket select * from emp_demo;    

Friday, May 27, 2022

Theory of Hive Partition

Partitions in Hive

Partitioning is the optimization technique in Hive which improves the performance significantly. It is a way of dividing a table into related parts based on the values of partitioned columns. Using partition, it is easy to query a portion of the data. Tables or partitions are sub-divided into buckets, to provide extra structure to the data so that  data may be used for more efficient querying. Bucketing works based on the value of hash function of some column of a table.


hive> create table employee (id int, name string, age int,  department string)   

partitioned by (department string)  

row format delimited  

fields terminated by ','; 

The partitioning in Hive can be executed in two ways -

1) Static partitioning : In static , it is required to pass the values of partitioned columns manually while loading the data into the table

hive> load data local inpath '/home/codegyani/hive/employee_details' into table employee

partition((department = "hr");  

2) Dynamic partitioning : In dynamic partitioning, the values of partitioned columns exist within the table. So, it is not required to pass the values of partitioned columns manually

we can’t perform alter on the Dynamic partition.

Enable the dynamic partition by using the following commands: 

hive> set hive.exec.dynamic.partition=true;    

hive> set hive.exec.dynamic.partition.mode=nonstrict;  


Create a dummy table to store the data


create table employee_dummy(id int, name string, age int,  department string)   

row format delimited  

fields terminated by ','; 


Now, load the data into the table.

hive> load data local inpath '/home/codegyani/hive/employee_details' into table employee_dummy;


Now, insert the data of dummy table into the partition table  

insert into employee

partition(department)  

select id, name, age,department  

from employee_dummy;

---------------- adding partition to  existing table ---------------------------

ALTER TABLE <Table_Name> ADD 

[IF NOT EXISTS] PARTITION 

<partition_name> LOCATION 'hdfs_path_of_directory'

----------------Rename Partition ---------------------------------------

Renaming a Partition

ALTER TABLE employee PARTITION (department='software-engineer') RENAME TO PARTITION (department='developer');

Manually Renaming Partitions on HDFS

hdfs dfs -mv /user/hive/warehouse/employee/department=software-engineer   /user/hive/warehouse/employee/department=developer

----------------Dropping a Partition ---------------------------------------

Dropping a Partition

>ALTER TABLE employee DROP IF EXISTS PARTITION (department='developer');

Manually Dropping Partitions on HDFS

hdfs dfs -rm -R /user/hive/warehouse/employee/department=developer

Show All Partitions on Hive Table

SHOW PARTITIONS employee;

How to Filter Partitions

> SHOW PARTITIONS employee PARTITION(department='developer');

Wednesday, May 25, 2022

Spark Deploy Modes

Spark application can be submitted in two different ways –  client  mode and cluster mode

Client  Deploy Mode in Spark:

In client mode, the Spark driver component of the spark application will run on the machine from where the job submitted.client mode is majorly used for interactive and debugging purposes.Note that in client mode only the driver runs locally and all tasks run on cluster worker nodes.

The default deployment mode is client mode.

In client mode, a user session running spark-submit terminates ,  application also terminates with status fail.

Client mode is not used for Production jobs .This is used for testing purposes.

Driver logs are accessible from the local machine itself.






spark-submit --deploy-mode client --driver-memory xxxx  ......


Cluster Deploy Mode in Spark:


In Cluster Deploy mode, the driver program would be launched on any one of the spark cluster nodes (on any of the available nodes in the cluster). Cluster deployment is mostly used for large data sets where the job takes few mins/hrs to complete.In the cluster mode, Spark driver get started in any of the worker machines So, the user who is submitting the application can submit the application and the user can go away after initiating the application or can continue with some other work. So, it works with the concept of Fire and Forgets.

In any case, if the job is going to run for a long period time and we don’t want to wait for the result then we can submit the job using cluster mode so once the job submitted client doesn’t need to be online.

spark driver runs on one of the worker node within the cluster, which reduces the data movement overhead between submitting machine and the cluster.

For the Cloudera cluster, you should use yarn commands to access driver logs.

It highly reduces the chance of job failure



spark-submit --deploy-mode cluster --driver-memory xxxx  ........




Spark Terminalogy

 Some Terminalogy use in spark


 Resilient Distributed Dataset – RDD: 

 RDD is an acronym for Resilient Distributed Dataset. It is the fundamental unit of data in Spark. Basically, it is a distributed collection of elements across cluster nodes. Also performs parallel operations. Moreover, Spark RDDs are immutable in nature. Although, it can generate new RDD by transforming existing Spark RDD

Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark. They are immutable Distributed collections of objects of any type. As the name suggests is a Resilient (Fault-tolerant) records of data that resides on multiple nodes.


DataFrame :  It works only on structured and semi-structured data. Unlike an RDD, data organized into named columns. For example a table in a relational database. It is an immutable distributed collection of data. DataFrame in Spark allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction. DataFrames allow the Spark to manage schema.


DataSet : it is an extension of dataframe API, which provides the functionality of type-safe, object-oriented programming interface of the RDD API  It also efficiently processes structured and unstructured data


There are 3 ways of creating an RDD:


1)Parallelizing an existing collection of data

2)Referencing to the external data file stored

3)Creating RDD from an already existing RDD



Spark Session:

SPARK 2.0.0 onwards, SparkSession provides a single point of entry to interact with underlying Spark functionality and

allows programming Spark with DataFrame and Dataset APIs. All the functionality available with sparkContext are also available in sparkSession.

In order to use APIs of SQL, HIVE, and Streaming, no need to create separate contexts as sparkSession includes all the APIs.


SparkContext:

SparkContext is the entry gate of Apache Spark functionality. The most important step of any Spark driver application is to generate SparkContext. It allows your Spark Application to access Spark Cluster with the help of Resource Manager (YARN/Mesos). To create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext.

sparkConf is required to create the spark context object, which stores configuration parameter like appName (to identify your spark driver), application, number of core and memory size of executor running on worker node.

Once the SparkSession is instantiated, we can configure Spark’s run-time config properties 



--num-executors: Number of executors is the number of distinct yarn containers (think processes/JVMs) that will execute your application.

--executor-cores: Number of executor-cores is the number of threads you get inside each executor (container)


--executor-memory: An executor is a process that is launched for a Spark application on a worker node. Each executor memory is the sum of yarn overhead memory and JVM Heap memory. JVM Heap memory comprises of: RDD Cache Memory. Shuffle Memory.

--driver-memory :     The --driver-memory flag controls the amount of memory to allocate for a driver, which is 1GB by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application

setting it in the properties file (default is $SPARK_HOME/conf/spark-defaults.conf).



---job submmit----


--class org.apache.spark.examples.SparkPi --master yarn

--deploy-mode client --driver-memory 4g --num-executors 2 --executor-memory 2g

--executor-cores 2 /opt/apps/spark-1.6.0-bin-hadoop2.6/lib/spark-examples*.jar 10

Tuesday, May 17, 2022

Zookeeper

                                                        Introduction to Zookeeper

 Apache Zookeeper is a centralized service and a Hadoop Ecosystem component for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

ZooKeeper is a distributed co-ordination service to manage large set of hosts. Co-ordinating and 



managing a service in a distributed environment is a complicated process. ZooKeeper solves this issue with its simple architecture and API.

Zookeeper is a unit where the information regarding configuration, naming and group services are stored which are responsible for synchronization of Hadoop tasks.


The ZooKeeper framework was originally built at “Yahoo!” for accessing their applications in an easy and robust manner. Later, Apache ZooKeeper became a standard for organized service used by Hadoop, HBase, and other distributed frameworks.

zookeeper 3 important services are used widely in hadoop.


zookeeper failover controller: use in name node ha.


zookeeper statestore : use in resource manager ha.


zookeeper dynamic service discovery : use in hive server2  ha.

Sunday, May 15, 2022

impala

 Impala is a MPP (Massive Parallel Processing) SQL query engine for processing huge volumes of data that is stored in Hadoop cluster. 

A tool which we use to overcome the slowness of Hive Queries is what we call Impala. This separate tool was provided by Cloudera distribution

It is an open source software which is written in C++ and Java. It provides high performance and low latency compared to other SQL engines for Hadoop.

 Impala uses MPP (massively parallel processing) to run lightning fast queries against HDFS, HBase, etc.


It offers high-performance, low-latency SQL queries. Impala is the best option while we are dealing with medium sized datasets and we expect the real-time response from our queries. However, make sure Impala is available only in Hadoop distribution.

It can read almost all the file formats used by Hadoop. Like Parquet, Avro, RCFile

Impala is not based on MapReduce algorithms, unlike Apache Hive.

Hence, Impala is faster than Apache Hive, since it reduces the latency of utilizing MapReduce


There are 3 Daemons in Impala . they are as follow

Impala statestore : Impala statestore  is install on one host of the cluster. statestore  checks on the health of Impala daemons on all the DataNodes .

 We can say Statestore daemon is a name service that monitors the availability of Impala services across the cluster. 

Also, handles situations such as nodes becoming unavailable or becoming available again. Impala statestore keeps track of which ImpalaD’s are up and running, and relays this information to all the ImpalaD’s in the cluster. Hence, they are aware of this information when distributing tasks to other ImpalaD’s.


   


Impala Catelog server :   Impala Catelog server is install on 1 host of the cluster .via the state stored it distributes metadata to Impala daemons.

It is physically represented by a daemon process named catalogd . You only need such a process on one host in a cluster.



Impala Deamon: 

this daemon will be one per node. Moreover, on every data node, it will be installed. They form the core of the Impala execution engine and are the ones reading data from HDFS/HBase and aggregating/processing it.

in order to store the mapping between table and files this daemon will use Hive metastore. 

 Also, uses HDFS NN to get the mapping between files and blocks. Therefore, to get/process the data impala uses hive metastore and Name Node.

we can say all ImpalaD’s are equivalent. This daemon accepts queries from several tools such as  the impala-shell command, Hue, JDBC or ODBC.


There are 3 major components of ImpalaD such as



Query Planner :  Query Planner is responsible for parsing out the query  this planning occurs in 2 parts.

 1)  Since all the data in the cluster resided on just one node, a single node plan is made, at first

 

 2) Afterwards, on the basis of the location of various data sources in the cluster, this single node plan is converted to a distributed plan (thereby leveraging data locality).



Coordinator  :   Query Coordinator is responsible for coordinating the execution of the entire query. To read and process data, it sends requests to various executors. Afterward, it receives the data back from these executors and streams it back to the client via JDBC/ODBC

 

Executor : Executor is responsible for  aggregations of data .Especially, the data which is read locally or if not available locally could be streamed from executors of other Impala daemons

Introduction To Spark

 spark is fast general-purpose distributed data processing engine compatible with Hadoop data. On top of the Spark core data processing engine, there are libraries for SQL, machine learning, graph computation, and stream processing, which can be used together in an application.

It utilizes in-memory caching, and optimized query execution for fast analytic queries against data of any size.

that why it is widely used for big data workloads.


Spark is written in Scala but provides rich APIs for  Scala, Java,  Python, and R. Apache Spark is a tool for Running Spark Applications. Spark is 100 times faster than Bigdata Hadoop and 10 times faster than accessing data from disk. It can be integrated with Hadoop and can process existing Hadoop HDFS data.

 Resilient Distributed Dataset – RDD

 RDD is an acronym for Resilient Distributed Dataset. It is the fundamental unit of data in Spark. Basically, it is a distributed collection of elements across cluster nodes. Also performs parallel operations. Moreover, Spark RDDs are immutable in nature. Although, it can generate new RDD by transforming existing Spark RDD


 Lazy Evaluation

Spark Lazy Evaluation means the data inside RDDs are not evaluated on the go. Basically, only after an action triggers all the  computation is performed. Therefore, it limits how much work it has to do.



Apache Spark is an open-source, distributed data processing framework designed for processing large-scale, fault-tolerant, and high-performance data workloads.

It is widely used in modern data platforms for:

  • Batch Processing – large volumes of historical data
    → Using Spark Core and Spark SQL
  • Stream Processing – real-time data streams
    → Using Structured Streaming (built on Spark SQL)
  • ETL (Extract, Transform, Load) – data preparation and transformation
    → Using Spark SQL, DataFrame API, and Datasets
  • Machine Learning – scalable ML pipelines
    → Using MLlib
  • Analytics – interactive SQL and reporting
    → Using Spark SQL
  • Graph Processing – relationship and network analysis
    → Using GraphX

What are the main components of Spark?

Answer:
The core components of Apache Spark are:

1.      Spark Core – basic functionality and task scheduling

2.      Spark SQL – structured data processing

3.      Spark Streaming – real-time data processing

4.      MLlib – machine learning library

5.      GraphX – graph processing

 

1. Batch Processing

Batch processing collects data over a period of time and processes it all at once as a single "batch." This usually happens on a schedule (e.g., every night at 2 AM).

·         How it works: Data is collected, stored, and then processed in bulk.

·         Latency: High (minutes to days).

·         Best for: Massive volumes of data where immediate results aren't needed.

·         Example: Calculating monthly payroll, generating daily sales reports, or clearing bank transactions at the end of the day.

·         Tools: Apache Spark (RDD/DataFrames), AWS Glue, Snowflake, dbt, Airflow.

2. Streaming (Near Real-Time)

Streaming processes data records one by one as they arrive. It’s designed for continuous flow.

·         How it works: Instead of waiting for a "bucket" to fill up, the system processes each "drop" of data as it falls.

·         Latency: Low (seconds to milliseconds).

·         Best for: Monitoring systems where a delay of a few seconds is acceptable.

·         Example: Tracking GPS locations of delivery drivers, updating a live "trending topics" list on social media, or monitoring website traffic logs.

·         Tools: Apache Kafka, Spark Streaming, Amazon Kinesis, Google Pub/Sub.


 

Data skew:

Data skew occurs when one partitions has significantly more data ,leading to performance bottlenecks during shuffle operations like joins and aggregations

Where Data Skew Commonly Happens

  groupByKey ()
  
reduceByKey()
  
join()
  
repartition()
 Aggregations
 Window functions

Basically → Any shuffle operation.

Types of Data Skew

1  Key Skew

One key dominates dataset.

2 Partition Skew

Uneven partition size.

3 Join Skew

One key in join causes explosion.


 

How to Fix Data Skew:

   AQE → Enable spark.sql.adaptive.skewJoin.enabled (auto splits skewed partitions in Spark 3+).

  Broadcast Join → broadcast(small_df) to avoid shuffle.

  Salting → Add random suffix to heavy join key to spread data.

  Isolate Skew → Process heavy/NULL keys separately, then union.

  Repartition → df.repartition(n, "col") to balance partitions.

 


 

Typical Architecture:

1.     Data Ingestion

o    Kafka / API / Files

2.     Processing

o    Spark SQL (batch)

o    Structured Streaming (real-time)

3.     Storage

o    Data Lake (Parquet, Delta)

o    Data Warehouse

4.     Orchestration

o    Airflow

 

Why Spark is Important for Data Engineers

✅ Distributed & Scalable
✅ In-memory processing (faster than Hadoop MapReduce)
✅ Unified engine (Batch + Streaming + ML)
✅ Strong ecosystem support
✅ Cloud-native compatibility

Partitions

A Partition is a logical chunk of your data.

DE Perspective: This is the most critical component for a DE. You use repartition() or coalesce() to control how data is spread across the cluster. Correct partitioning prevents Data Skew.


 

Speculative Execution (The Backup Plan)

In a large cluster, sometimes a single machine (Worker Node) becomes slow because of hardware issues, network congestion, or a noisy neighbor. This is called a Straggler.

How it works: If Spark detects that a Task is taking much longer than the average time of other tasks in the same stage, it launches a "speculative copy" of that same task on a different executor.

The Race: Spark now has two versions of the same task running. Whichever one finishes first, Spark accepts that result and kills the other one.

Settings: You turn this on using spark.speculation = true.

What  is check pointing

Check  pointing stores intermediate results to reliable storage like:

.HDFS

.Amazon S3

Used in

Streaming

Fault tolerance 


 

Spark Core (Foundation / Execution Engine)

Spark Core is the base engine responsible for:

·         Distributed task scheduling

·         Memory management

·         Fault tolerance

·         I/O operations

·         Interaction with storage systems (HDFS, S3, ADLS)

Key Internal Components:

Driver

·         Entry point of the application

·         Builds the execution plan (DAG)

·         Sends tasks to executors

Executors

·         Run tasks in parallel

·         Store cached/shuffled data

·         Return results to driver

Cluster Manager

·         Allocates CPU & memory resources

·         Examples: YARN, Kubernetes, Standalone

Data Engineering Importance:

·         Enables parallel ETL processing

·         Handles large-scale batch workloads

·         Ensures fault tolerance using lineage



 

Spark SQL (Structured Data Processing Layer)

Spark SQL processes structured and semi-structured data using:

·         SQL queries

·         DataFrame API

·         Dataset API

Key Features:

·         Catalyst Optimizer (query optimization)

·         Tungsten Engine (memory & CPU optimization)

·         Hive integration

Data Engineering Use Cases:

·         Data cleaning and transformation

·         Schema enforcement

·         Working with JSON, Parquet, ORC, Avro

·         Building curated datasets for analytics

Example:
Raw logs → Transform → Store as Parquet in Data Lake


3  Structured Streaming (Real-Time Processing)

Spark’s real-time processing engine built on Spark SQL.

Features:

·         Micro-batch processing

·         Event-time processing

·         Watermarking (handles late data)

·         Exactly-once guarantees


 

Data Engineering Use Cases:

·         Kafka stream processing

·         Real-time dashboards

·         Fraud detection

·         IoT event processing

Example pipeline:
Kafka → Spark Structured Streaming → Delta Lake


4  MLlib (Machine Learning Library)

Distributed ML library inside Spark.

From Data Engineering View:

·         Build feature engineering pipelines

·         Prepare large-scale training datasets

·         Support ML teams with scalable preprocessing

Typically, data engineers focus more on data preparation than model building.


5  GraphX (Graph Processing Engine)

Used for graph-based computations.

Use Cases:

·         Social network analysis

·         Recommendation engines

·         Fraud network detection

Not common in standard ETL but useful in advanced analytics.


6  Deployment & Cluster Management

Spark can run on:

·         Hadoop YARN

·         Kubernetes

·         Standalone clusters

·         Cloud platforms (EMR, Databricks)

Why Important for Data Engineers:

·         Resource tuning (memory, partitions, cores)

·         Cost optimization

·         Scalability planning

·         Production deployment management

Q: What is the difference between a Transformation and an Action?

·         Transformations: Functions that create a new RDD/DataFrame from an existing one (e.g., map, filter, join). They are Lazy—Spark only records them in a DAG.

·         Actions: Operations that trigger computation and return results to the driver or write to storage (e.g., count, collect, write).

Q: What is a DAG (Directed Acyclic Graph)?

It is a logical representation of the sequence of transformations applied to the data. It allows Spark to optimize the execution plan (e.g., pipelining transformations) before actually running the tasks.


 

2. Data Abstractions: RDD vs. DataFrame vs. Dataset

Q: Why choose DataFrames over RDDs?

  • Optimization: DataFrames use the Catalyst Optimizer and Tungsten execution engine, which optimize query plans and memory usage. RDDs have no built-in optimization.
  • Structure: DataFrames provide a schema-based view (columns/rows), making them easier to use for SQL-like operations.

Q: When would you still use an RDD?

  • When you need low-level control over data distribution or physical placement.
  • When dealing with unstructured data that doesn't fit a schema.
  • When performing complex functional programming that isn't easily expressed in SQL/DSL.

3. Performance Tuning & Optimization (Critical for Data Engineers)

Q: How do you handle Data Skew? Data skew occurs when one partition has significantly more data than others, causing one task to run for hours while others finish in seconds.

  • Salting: Add a random "salt" key to the join key to redistribute the data.
  • Broadcast Join: If one table is small, broadcast it to all nodes to avoid shuffles.
  • Iterative Filtering: Filter out the skewed keys and process them separately.

 

Q: What is the difference between repartition() and coalesce()?

  • repartition(): Can increase or decrease partitions. It triggers a full shuffle, ensuring data is distributed uniformly.
  • coalesce(): Only used to decrease partitions. It avoids a full shuffle by merging existing partitions, making it much more efficient for reducing file counts after a filter.

Q: Explain the Catalyst Optimizer. It is the engine behind Spark SQL/DataFrames. It works in four phases:

1.     Analysis: Resolving table/column names.

2.     Logical Optimization: Applying rules like Predicate Pushdown (filtering data at the source) and Projection Pruning (selecting only needed columns).

3.     Physical Planning: Generating multiple physical plans and picking the one with the lowest cost.

4.     Code Generation: Generating Java bytecode for execution.


4. Storage & Memory Management

Q: Cache vs. Persist?

  • cache(): Shortcut for persist(StorageLevel.MEMORY_ONLY).
  • persist(): Allows you to specify the storage level (e.g., MEMORY_AND_DISK, DISK_ONLY, or MEMORY_ONLY_SER to save space via serialization).


 

Q5 : What are Broadcast Variables and Accumulators?

  • Broadcast Variables: Read-only variables cached on every machine rather than being sent with every task. Use this for small lookup tables in joins.
  • Accumulators: Write-only variables used for counters or sums across the cluster (e.g., counting corrupted records). Only the Driver can read the final value.

 Structured Streaming

Q: What is Watermarking? Watermarking is a threshold used in streaming to handle late data. It tells Spark how long to wait for late-arriving events before dropping them from the state. For example, a 10-minute watermark means data arriving more than 10 minutes late will be ignored.

Q: Explain Checkpointing. Checkpointing saves the current state (offsets, metadata) to a fault-tolerant storage like S3/HDFS. If the stream fails, it can restart from the exact point it left off, ensuring exactly-once processing.


6. Real-world Scenario / Coding

Q: "My Spark job is slow. How do you debug it?"

1.     Open Spark UI: Look for the longest-running Stage.

2.     Check for Skew: If one task is taking much longer than others, it's a skew issue.

3.     Check for Spill: If you see "Disk Spill" in the UI, it means your executors don't have enough RAM for the operation, and data is being moved to disk (very slow).

4.     Check Shuffle Read/Write: High shuffle indicates a need for Broadcast Joins or better partitioning.

7. My job failed with an OutOfMemory (OOM) error. How do you fix it?

Case A: Driver OOM

·         Cause: You probably ran .collect() on a massive dataset. This sends all the data from the workers to the single Driver machine.

·         The Fix: Don't use .collect(). Write the data to a file (S3/HDFS) instead, or increase spark.driver.memory.

Case B: Executor OOM

·         Cause: Your partitions are too big (Data Skew), or you have too many concurrent tasks running on one executor.

·         The Fix:

1.     Increase Partitions: Use .repartition() to make the data chunks smaller.

2.     Increase Memory: Adjust --executor-memory.

3.     Reduce Cores: If you have 5 cores per executor, 5 tasks are fighting for the same RAM. Reducing cores to 2 or 3 gives each task more "breathing room."