Monday, June 10, 2024

Azure Load Balancer

 

set up azure load Balancer 

step1 : start

step2 : create resource group in the same region

step3 : create virtual machine  

step3 : under Basics tab select Availability options Availability set and Availability set user-defined-name AS1 and 

step4 : create availability set  and set Fault domains=3 and update domains=3 

select  region (US) Cneteral US scroll down page Inboud port rules check HTTP(80) , RDP (3389)  .  

step5 : under Networking tab create vitual network  and set ip cider range 10.0.0.0/16 and create subnet <<Subnet1>> Addressr range 10.0.1.0/24 .


Saturday, March 2, 2024

aws-instance-using-terraform

terraform {

  required_providers {

    aws = {

      source  = "hashicorp/aws"

      version = "~> 4.16"

    }

  }


  required_version = ">= 1.2.0"

}




provider "aws" {

  region     = "region_name"

  access_key = "access_key"

  secret_key = "secret_key"

}



resource "aws_instance" "app_server" {

  ami           = "ami-0440d3b780d96b29d"

  instance_type = "t2.micro"


  tags = {

    Name = "ExampleAppServerInstance"

  }

}


Sunday, July 2, 2023

Architecture of Zookeeper

 ZooKeeper

ZooKeeper is a centralized, open-source coordination service that provides reliable distributed coordination for applications in a distributed system. It follows a client-server architecture and is designed to be highly available and fault-tolerant. Let's explore the architecture of ZooKeeper:

 

Ensemble of Servers:

ZooKeeper operates in an ensemble mode where multiple ZooKeeper servers form a cluster. Each server in the ensemble contributes to the overall availability and fault-tolerance of the system. An ensemble typically consists of an odd number of servers (e.g., 3, 5, or 7) to achieve majority-based consensus.

 

Leader-Follower Model:

Within the ZooKeeper ensemble, one server is elected as the leader, while the rest of the servers function as followers. The leader is responsible for processing and coordinating all the client requests, while the followers replicate the leader's state and serve read requests from clients.

 

Data Model:

ZooKeeper provides a hierarchical data model similar to a file system, known as a "ZooKeeper tree" or "namespace." The namespace is organized as a tree-like structure, with each node referred to as a "znode." Znodes can be used to store data and also serve as synchronization primitives.

 

Write Requests and Consensus:

When a client sends a write request to the ZooKeeper ensemble, it is forwarded to the leader. The leader processes the request, updates its own state, and propagates the changes to the followers. To ensure consistency, ZooKeeper uses a consensus algorithm called ZAB (ZooKeeper Atomic Broadcast). The ZAB protocol ensures that all changes are applied in the same order on each server, maintaining strong consistency across the ensemble.

 

Read Requests and Follower Synchronization:

Read requests from clients can be served by any server in the ensemble, not just the leader. Followers maintain a copy of the leader's state through a process called "follower synchronization." When a follower receives updates from the leader, it applies the changes to its own state, ensuring that all servers have a consistent view of the data.

 

Watches and Event Notifications:

ZooKeeper supports a watch mechanism where clients can register watches on znodes. A watch is triggered when the data of a watched znode changes or when a znode is created or deleted. ZooKeeper sends notifications to the interested clients, allowing them to react to changes in real-time.

 

Client Libraries and Sessions:

ZooKeeper provides client libraries in various programming languages that enable applications to interact with the ensemble. Clients establish sessions with the ensemble and maintain a connection to one of the servers. If a client's session expires or its connection is lost, it can reconnect and resume its operations without losing its context.

 

By providing distributed coordination and synchronization primitives, ZooKeeper enables applications to implement various distributed systems, such as distributed locks, leader election, configuration management, and more. Its architecture ensures high availability, fault-tolerance, and strong consistency, making it a reliable foundation for building distributed applications.

Very Very Very important for interview point of view as hadoop administrator profile
Follow 👉https://lnkd.in/dDYk_vQs ðŸ‘ˆ for awesome stuff
YouTube channel : https://lnkd.in/d4JjCKZ4


Saturday, July 1, 2023

Speculative

 Speculative execution in Hadoop is a feature that addresses the problem of slow-running tasks, known as stragglers, in a MapReduce job. When enabled, Hadoop identifies tasks that are taking longer to complete than their counterparts and launches additional copies of those tasks on different nodes. The goal is to complete the job faster by having multiple attempts running in parallel and using the first successful result.

 

The speculative task attempts run concurrently with the original tasks. Hadoop monitors their progress and compares their execution times. Once any task completes successfully, all other speculative task attempts for the same task are terminated. The output of the successful task attempt is then used as the final result.


 

 

The purpose of speculative execution is to improve job completion time and resource utilization. By launching multiple attempts of slow-running tasks, Hadoop mitigates the impact of stragglers, which could be caused by various factors like hardware failures, network issues, or data skew. Speculative execution allows the job to make progress even if some tasks are running significantly slower than expected.

 

Overall, speculative execution is a technique employed by Hadoop to optimize job execution in a distributed computing environment by identifying and addressing slow-running tasks. It helps improve the efficiency and reliability of data processing in Hadoop clusters




Follow 👉https://lnkd.in/dDYk_vQs ðŸ‘ˆ for awesome stuff

YouTube channel : https://youtu.be/RJdXBT7f2U8




Friday, October 7, 2022

connecting python with phpmyadmin database for performming SQL operations for data engineering

# install first pip install pymysql

import pymysql

 

connection = pymysql.connect(host="hostname", user="username", passwd="password", database="db_name")

cursor = connection.cursor()


# for insert

insertsql="insert into emp(id,name) values(1,'ashraf')"

cursor.execute(insertsql)

# for update

updatesql="update emp set name='new_name' where id=1"

cursor.execute(updatesql)

# for delete 

deletesql="delete from emp where id=1"

cursor.execute(deletesql)



 # for display

selectsql="select * from emp"


cursor.execute(selectsql)

records=cursor.fetchall()

for x in records:

  print(x[2])

connection.close


Follow 👉https://lnkd.in/dDYk_vQs ðŸ‘ˆ for awesome stuff

YouTube channel : https://youtu.be/RJdXBT7f2U8


Saturday, October 1, 2022

Imapala Artecture

Impala

Impala is a MPP (Massive Parallel Processing) SQL query engine for processing huge volumes of data that is stored in Hadoop cluster. It is an open source software which is written in C++ and Java. It provides high performance and low latency compared to other SQL engines for Hadoop. Impala is the highest performing SQL engine (giving RDBMS-like experience) which provides the fastest way to access data that is stored in Hadoop Distributed File System.

Impala  SQL engines claiming to do parallel processing! Impala’s open source Massively Parallel Processing (MPP) SQL engine is here, armed with all the power to push you aside. The only condition it needs is data be stored in a cluster of computers running Apache Hadoop, which, given Hadoop’s dominance in data warehousing, isn’t uncommon. Cloudera Impala was announced on the world stage in October 2012 and after a successful beta run, was made available to the general public in May 2013.

                          Imapala Artecture 



Three important deamons of impla are as follow

Impala statestore : Impala statestore is install on one host of the cluster. statestore checks on the health of Impala daemons on all the DataNodes. It is physically represented by a daemon process named statestored.  We can say Statestore daemon is a name service that monitors the availability of Impala services across the cluster.  Also, handles situations such as nodes becoming unavailable or becoming available again. Impala statestore keeps track of which ImpalaD’s are up and running, and relays this information to all the ImpalaD’s in the cluster. Hence, they are aware of this information when distributing tasks to other ImpalaD’s.

In the event of a node failure due to any reason, Statestore updates all other nodes about this failure and once such a notification is available to the other impalad, no other Impala daemon assigns any further queries to the affected node.

 

Impala Catelog server:   Impala Catelog server is install on 1 host of the cluster. via the state stored it distributes metadata to Impala daemons. It is physically represented by a daemon process named catalogd. You only need such a process on one host in a cluster. Usually, statestored and catalogd process will be running on same host as catalog services are passed through statestored. The catalog service avoids the need to issue REFRESH and INVALIDATE METADATA statements when the metadata changes are performed by statements issued through Impala. The Impala component known as the Catalog Service relays the metadata changes from Impala SQL statements to all the Impala daemons in a cluster. Most considerations for load balancing and high availability apply to the impalad daemon. The statestored and catalogd daemons do not have special requirements for high availability, because problems with those daemons do not result in data loss. If those daemons become unavailable due to an outage on a particular host, you can stop the Impala service, delete the Impala StateStore and Impala Catalog Server roles, add the roles on a different host, and restart the Impala service

 

Impala Deamon (Impalad):

This daemon will be one per node. Moreover, on every data node, it will be installed. They form the core of the Impala execution engine and are the ones reading data from HDFS/HBase and aggregating/processing it. It accepts the queries from various interfaces like impala shell, hue browser, etc.… and processes them. Whenever a query is submitted to an impalad on a particular node, that node serves as a “coordinator node” for that query. Multiple queries are served by Impalad running on other nodes as well the core Impala component is the Impala daemon, physically represented by the impalad process. After accepting the query, Impalad reads and writes to data files and parallelizes the queries by distributing the work to the other Impala nodes in the Impala cluster. When queries are processing on various Impalad instances, all of them return the result to the central coordinating node. After receiving the query, the query coordinator verifies whether the query is appropriate, using the Table Schema from the Hive meta store. it  then collects the information about the location of the data that is required to execute the query, from HDFS name node and sends this information to other impalads in order to execute the query .Depending on the requirement, queries can be submitted to a dedicated Impalad or in a load balanced manner to another Impalad in your cluster. In order to store the mapping between table and files this daemon will use Hive metastore. All the other Impala daemons read the specified data block and processes the query. As soon all the daemons complete their tasks, the query coordinator collects the result back and delivers it to the user.  Also, uses HDFS NN to get the mapping between files and blocks. Therefore, to get/process the data impala uses hive metastore and Name Node. we can say all ImpalaD’s are equivalent. This daemon accepts queries from several tools such as the impala-shell command, Hue, JDBC or ODBC. The Impala daemons are in constant communication with StateStore, to confirm which daemons are healthy and can accept new work. They also receive broadcast messages from the catalogd daemon. whenever any Impala daemon in the cluster creates, alters, or drops any type of object, or when an INSERT or LOAD DATA statement is processed through Impala. This background communication minimizes the need for REFRESH or INVALIDATE METADATA statements that were needed to coordinate metadata across Impala daemons prior to Impala 1.2.

 

 

 

 

 

 

There are 3 major components of ImpalaD such as

 

 

Query Planner:  Query Planner is responsible for parsing out the query this planning occurs in 2 parts.

 1)  Since all the data in the cluster resided on just one node, a single node plan is made, at first

 2) Afterwards, on the basis of the location of various data sources in the cluster, this single node plan is converted to a distributed plan (thereby leveraging data locality).

Coordinator:   Query Coordinator is responsible for coordinating the execution of the entire query. To read and process data, it sends requests to various executors. Afterward, it receives the data back from these executors and streams it back to the client via JDBC/ODBC

Executor:   Executor is responsible for aggregations of data. Especially, the data which is read locally or if not available locally could be streamed from executors of other Impala daemons

       Why Impala

         Use impala for following reasons

Do BI-style Queries on Hadoop

Impala provides low latency and high concurrency for BI/analytic queries on Hadoop (not delivered by batch frameworks such as Apache Hive). Impala also scales linearly, even in multitenant environments.

Unify Your Infrastructure

Utilize the same file and data formats and metadata, security, and resource management frameworks as your Hadoop deployment—no redundant infrastructure or data conversion/duplication.

Implement Quickly

For Apache Hive users, Impala utilizes the same metadata and ODBC driver. Like Hive, Impala supports SQL, so you don't have to worry about re-inventing the implementation wheel.

 

Count on Enterprise-class Security

Impala is integrated with native Hadoop security and Kerberos for authentication, and via the Sentry module, you can ensure that the right users and applications are authorized for the right data.

Retain Freedom from Lock-in

Impala is open source (Apache License).

Expand the Hadoop User-verse

With Impala, more users, whether using SQL queries or BI applications, can interact with more data through a single repository and metadata store from source through analysis.

impala combines the SQL support and multi-user performance of a traditional analytic database with the scalability and flexibility of Apache Hadoop, by utilizing standard components such as HDFS, HBase, Metastore, YARN, and Sentry. With Impala, users can communicate with HDFS or HBase using SQL queries in a faster way compared to other SQL engines like Hive. Impala can read almost all the file formats such as Parquet, Avro, RCFile used by Hadoop. Impala uses the same metadata, SQL syntax (Hive SQL), ODBC driver, and user interface (Hue Beeswax) as Apache Hive, providing a familiar and unified platform for batch-oriented or real-time queries.  Unlike Apache Hive, Impala is not based on MapReduce algorithms. It implements a distributed architecture based on daemon processes that are responsible for all the aspects of query execution that run on the same machines. Thus, it reduces the latency of utilizing MapReduce and this makes Impala faster than Apache Hive

Count on Enterprise-class Security

Impala is integrated with native Hadoop security and Kerberos for authentication, and via the Sentry module, you can ensure that the right users and applications are authorized for the right data.

How Impala Uses HDFS

 

Impala uses the distributed filesystem HDFS as its primary data storage medium. Impala relies on the redundancy provided by HDFS to guard against hardware or network outages on individual nodes. Impala table data is physically represented as data files in HDFS, using familiar HDFS file formats and compression codecs. When data files are present in the directory for a new table, Impala reads them all, regardless of file name. New data is added in files with names controlled by Impala.

How Impala Uses HBase

HBase is an alternative to HDFS as a storage medium for Impala data. It is a database storage system built on top of HDFS, without built-in SQL support. Many Hadoop users already have it configured and store large (often sparse) data sets in it. By defining tables in Impala and mapping them to equivalent tables in HBase, you can query the contents of the HBase tables through Impala, and even perform join queries including both Impala and HBase tables. You can use Impala to query HBase tables. This is useful for accessing any of your existing HBase tables via SQL and performing analytics over them. HDFS and Kudu tables are preferred over HBase for analytic workloads and offer superior performance. Kudu supports efficient inserts, updates and deletes of small numbers of rows and can replace HBase for most analytics-oriented use cases.

Using Impala to Query Kudu Tables

You can use Impala to query tables stored by Apache Kudu. This capability allows convenient access to a storage system that is tuned for different kinds of workloads than the default with Impala. By default, Impala tables are stored on HDFS using data files with various file formats. HDFS files are ideal for bulk loads (append operations) and queries using full-table scans, but do not support in-place updates or deletes. Kudu is an alternative storage engine used by Impala which can do both in-place updates (for mixed read/write workloads) and fast scans (for data-warehouse/analytic operations). Using Kudu tables with Impala can simplify the ETL pipeline by avoiding extra steps to segregate and reorganize newly arrived data. Certain Impala SQL statements and clauses, such as DELETE, UPDATE, UPSERT, and PRIMARY KEY work only with Kudu tables. Other statements and clauses, such as LOAD DATA, TRUNCATE TABLE, and INSERT OVERWRITE, are not applicable to Kudu tables

Advantages of Impala

Here is a list of some noted advantages of Cloudera Impala.

Ø Using impala, you can process data that is stored in HDFS at lightning-fast speed with traditional SQL knowledge.

Ø Since the data processing is carried where the data resides (on Hadoop cluster), data transformation and data movement is not required for data stored on Hadoop, while working with Impala.

Ø Using Impala, you can access the data that is stored in HDFS, HBase, and Amazon s3 without the knowledge of Java (MapReduce jobs). You can access them with a basic idea of SQL queries.

Ø To write queries in business tools, the data has to be gone through a complicated extract-transform-load (ETL) cycle. But, with Impala, this procedure is shortened. The time-consuming stages of loading & reorganizing is overcome with the new techniques such as exploratory data analysis & data discovery making the process faster.

Ø Impala is pioneering the use of the Parquet file format, a columnar storage layout that is optimized for large-scale queries typical in data warehouse scenarios.

Ø  Fast Speed

Ø  No need to Move Data

Ø  Easy Access

Ø   Short Procedure

Ø  File Format

Ø  Big Data

Ø  Relational model

Ø  Languages

Ø  Familiar

Ø  Distributed

Ø  Faster Access

Ø  High Performance

Disadvantages of Impala

Ø No Support SerDe

Ø No custom Binary Files

Ø Need to Refresh

Ø No Support for Triggers

Ø No Updation

Ø No Transactions

Ø No Indexing

  

 Data Types Supported by Impala

 

 It supports all the datatypes be it numeric, character, date.

                          Ã˜ Tinyint

Ø Smallint

Ø Int

Ø Bigint

Ø Float

Ø Boolean

Ø String

Ø Varchar

Ø Char

Ø Double

Ø Real

Ø Decimal

 

 

Impala Features

 

There are several features of Impala, let’s discuss all the Impala features one by one

Ø  Open Source

Basically, under the Apache license, Impala is available freely as open source.

Ø   In-memory Processing

While it’s come to processing, Cloudera Impala supports in-memory data processing. That implies without any data movement it accesses/analyzes data that is stored on Hadoop data nodes. the data are present in memory that makes the query optimization faster and easy.

Ø   Easy Data Access

However, using SQL-like queries, we can easily access data using Impala. Moreover, Impala offers Common data access interfaces. That includes:
      i. JDBCdriver.   
      ii. ODBC driver.

Ø  Faster Access

While we compare Impala to another SQL engines, Impala offers faster access to the data in HDFS.

Ø  Storage Systems

We can easily store data in storage systems such as HDFS, Apache HBase, and Amazon s3.
i. HDFS file formats: Delimited text files, Parquet, Avro, SequenceFile, and RCFile.
ii. Compression codecs: Snappy, GZIP, Deflate, BZIP.

Ø   Easy Integration

It is possible to integrate Impala with business intelligence tools such as Tableau, Pentaho, Micro strategy, and Zoom data.

Ø  File Formats

There are several files formats which Impala supports like LZO, Sequence File, Avro, RCFile, and Parquet.

Ø  Drivers from Hive

There is one advantage, Impala uses from Hive. That is its metadata, ODBC driver, and SQL syntax.

Ø  Joins and Functions

Including SELECT, joins, and aggregate functions, Impala offers most common SQL-92 features of Hive Query Language (HiveQL).

Ø  Developed

Basically, Cloudera Impala is written in C++ and Java languages.

Ø  Relational model

One of the major points is Impala follows the Relational model.

Ø  Data Model

However, Impala’s data model is Schema-based in nature.

Ø  API’s

While it comes to API’s, Impala offers JDBC and ODBC API’s.

Ø  Languages Support

Moreover, it supports all languages supporting JDBC/ODBC.

Ø  High Performance

While we compare Impala to another SQL engines, Impala offers high performance and low latency for Hadoop.

Ø  Query UI

Moreover, it supports, Hue Beeswax and the Cloudera Impala Query UI.

Ø  CLI

It supports Impala-shell command-line interface.

Ø  Authentication

it offers Kerberos authentication.

 

Ø Faster Access: 

It is comparatively much faster for SQL Queries and data processing.

Ø Storage System: 

Has the capability to access HDFS and HBASE as its storage system.

Ø Secured: 

It offers Kerberos Authentication making it secure.

Ø Multi API Supports: 

Its supports multiple API that helps it for the connection with the data sources.

Ø Easily Connected: 

It is easy to connect over many data visualization engines such as TABLEAU, etc.

Ø Built-in Functions: 

Impala comes over with several built-in Functions with which we can go over with the results we need. Some of the built-in function IMPALA supports are abs,concat, power, adddate, date_add, dayofweek, nvl, zeroifnull.

Hardware Requirements for Optimal Join Performance

During join operations, portions of data from each joined table are loaded into memory. Data sets can be very large, so ensure your hardware has sufficient memory to accommodate the joins you anticipate completing.

While requirements vary according to data set size, the following is generally recommended:

CPU

Impala version 2.2 and higher uses the SSSE3 instruction set, which is included in newer processors.

 

Note: This required level of processor is the same as in Impala version 1.x. The Impala 2.0 and 2.1 releases had a stricter requirement for the SSE4.1 instruction set, which has now been relaxed.

Memory

128 GB or more recommended, ideally 256 GB or more. If the intermediate results during query processing on a particular node exceed the amount of memory available to Impala on that node, the query writes temporary work data to disk, which can lead to long query times. Note that because the work is parallelized, and intermediate results for aggregate queries are typically smaller than the original data, Impala can query and join tables that are much larger than the memory available on an individual node.

JVM Heap Size for Catalog Server

4 GB or more recommended, ideally 8 GB or more, to accommodate the maximum numbers of tables, partitions, and data files you are planning to use with Impala.

Storage

DataNodes with 12 or more disks each. I/O speeds are often the limiting factor for disk performance with Impala. Ensure that you have sufficient disk space to store the data Impala will be querying.

User Account Requirements

Impala creates and uses a user and group named impala. Do not delete this account or group and do not modify the account's or group's permissions and rights. Ensure no existing systems obstruct the functioning of these accounts and groups. For example, if you have scripts that delete user accounts not in a white-list, add these accounts to the list of permitted accounts.

For correct file deletion during DROP TABLE operations, Impala must be able to move files to the HDFS trashcan. You might need to create an HDFS directory /user/impala, writeable by the impala user, so that the trashcan can be created. Otherwise, data files might remain behind after a DROP TABLE statement.

Impala should not run as root. Best Impala performance is achieved using direct reads, but root is not permitted to use direct reads. Therefore, running Impala as root negatively affects performance.

By default, any user can connect to Impala and access all the associated databases and tables. You can enable authorization and authentication based on the Linux OS user who connects to the Impala server, and the associated groups for that user. Impala Security for details. These security features do not change the underlying file permission requirements; the impala user still needs to be able to access the data files.

SQL Operations that Spill to Disk

Certain memory-intensive operations write temporary data to disk (known as spilling to disk) when Impala is close to exceeding its memory limit on a particular host.

The result is a query that completes successfully, rather than failing with an out-of-memory error. The tradeoff is decreased performance due to the extra disk I/O to write the temporary data and read it back in. The slowdown could be potentially be significant. Thus, while this feature improves reliability, you should optimize your queries, system parameters, and hardware configuration to make this spilling a rare occurrence.

 

What kinds of queries might spill to disk:

Several SQL clauses and constructs require memory allocations that could activate the spilling mechanism:

Ø  when a query uses a GROUP BY clause for columns with millions or billions of distinct values, Impala keeps a similar number of temporary results in memory, to accumulate the aggregate results for each value in the group.

Ø  When large tables are joined together, Impala keeps the values of the join columns from one table in memory, to compare them to incoming values from the other table.

Ø  When a large result set is sorted by the ORDER BY clause, each node sorts its portion of the result set in memory.

Ø  The DISTINCT and UNION operators build in-memory data structures to represent all values found so far, to eliminate duplicates as the query progresses.

When the spill-to-disk feature is activated for a join node within a query, Impala does not produce any runtime filters for that join operation on that host. Other join nodes within the query are not affected.

 

How Impala handles scratch disk space for spilling:

By default, intermediate files used during large sort, join, aggregation, or analytic function operations are stored in the directory /tmp/impala-scratch, and these intermediate files are removed when the operation finishes. You can specify a different location by starting the impalad daemon with the ‑‑scratch_dirs="path_to_directory" configuration option.

Memory usage for SQL operators:

In Impala 2.10 and higher, the way SQL operators such as GROUP BY, DISTINCT, and joins, transition between using additional memory or activating the spill-to-disk feature is changed. The memory required to spill to disk is reserved up front, and you can examine it in the EXPLAIN plan when the EXPLAIN_LEVEL query option is set to 2 or higher.

The infrastructure of the spilling feature affects the way the affected SQL operators, such as GROUP BY, DISTINCT, and joins, use memory. On each host that participates in the query, each such operator in a query requires memory to store rows of data and other data structures. Impala reserves a certain amount of memory up front for each operator that supports spill-to-disk that is sufficient to execute the operator. If an operator accumulates more data than can fit in the reserved memory, it can either reserve more memory to continue processing data in memory or start spilling data to temporary scratch files on disk. Thus, operators with spill-to-disk support can adapt to different memory constraints by using however much memory is available to speed up execution, yet tolerate low memory conditions by spilling data to disk.

The amount data depends on the portion of the data being handled by that host, and thus the operator may end up consuming different amounts of memory on different hosts.

Added in: This feature was added to the ORDER BY clause in Impala 1.4. This feature was extended to cover join queries, aggregation functions, and analytic functions in Impala 2.0. The size of the memory work area required by each operator that spills was reduced from 512 megabytes to 256 megabytes in Impala 2.2. The spilling mechanism was reworked to take advantage of the Impala buffer pool feature and be more predictable and stable in Impala 2.10.

Testing performance implications of spilling to disk:

To artificially provoke spilling, to test this feature and understand the performance implications, use a test environment with a memory limit of at least 2 GB. Issue the SET command with no arguments to check the current setting for the MEM_LIMIT query option. Set the query option DISABLE_UNSAFE_SPILLS=true. This option limits the spill-to-disk feature to prevent runaway disk usage from queries that are known in advance to be suboptimal. Within impala-shell, run a query that you expect to be memory-intensive, based on the criteria explained earlier. A self-join of a large table is a good candidate:

select count(*) from big_table a join big_table b using (column_with_many_values);

Issue the PROFILE command to get a detailed breakdown of the memory usage on each node during the query.

Set the MEM_LIMIT query option to a value that is smaller than the peak memory usage reported in the profile output. Now try the memory-intensive query again.

Check if the query fails with a message like the following:

WARNINGS: Spilling has been disabled for plans that do not have stats and are not hinted to prevent potentially bad plans from using too many cluster resources. Compute stats on these tables, hint the plan or disable this behavior via query options to enable spilling. If so, the query could have consumed substantial temporary disk space, slowing down so much that it would not complete in any reasonable time. Rather than rely on the spill-to-disk feature in this case, issue the COMPUTE STATS statement for the table or tables in your sample query. Then run the query again, check the peak memory usage again in the PROFILE output, and adjust the memory limit again if necessary to be lower than the peak memory usage.

At this point, you have a query that is memory-intensive, but Impala can optimize it efficiently so that the memory usage is not exorbitant. You have set an artificial constraint through the MEM_LIMIT option so that the query would normally fail with an out-of-memory error. But the automatic spill-to-disk feature means that the query should actually succeed, at the expense of some extra disk I/O to read and write temporary work data.

Try the query again, and confirm that it succeeds. Examine the PROFILE output again. This time, look for lines of this form:

- SpilledPartitions: N

If you see any such lines with N greater than 0, that indicates the query would have failed in Impala releases prior to 2.0, but now it succeeded because of the spill-to-disk feature. Examine the total time taken by the AGGREGATION_NODE or other query fragments containing non-zero SpilledPartitions values. Compare the times to similar fragments that did not spill, for example in the PROFILE output when the same query is run with a higher memory limit. This gives you an idea of the performance penalty of the spill operation for a particular query with a particular memory limit. If you make the memory limit just a little lower than the peak memory usage, the query only needs to write a small amount of temporary data to disk. The lower you set the memory limit, the more temporary data is written and the slower the query becomes.

Now repeat this procedure for actual queries used in your environment. Use the DISABLE_UNSAFE_SPILLS setting to identify cases where queries used more memory than necessary due to lack of statistics on the relevant tables and columns, and issue COMPUTE STATS where necessary.

When to use DISABLE_UNSAFE_SPILLS:

You might wonder, why not leave DISABLE_UNSAFE_SPILLS turned on all the time. Whether and how frequently to use this option depends on your system environment and workload.

 

DISABLE_UNSAFE_SPILLS is suitable for an environment with ad hoc queries whose performance characteristics and memory usage are not known in advance. It prevents "worst-case scenario" queries that use large amounts of memory unnecessarily. Thus, you might turn this option on within a session while developing new SQL code, even though it is turned off for existing applications.

Organizations where table and column statistics are generally up-to-date might leave this option turned on all the time, again to avoid worst-case scenarios for untested queries or if a problem in the ETL pipeline results in a table with no statistics. Turning on DISABLE_UNSAFE_SPILLS lets you "fail fast" in this case and immediately gather statistics or tune the problematic queries.

Some organizations might leave this option turned off. For example, you might have tables large enough that the COMPUTE STATS takes substantial time to run, making it impractical to re-run after loading new data. If you have examined the EXPLAIN plans of your queries and know that they are operating efficiently, you might leave DISABLE_UNSAFE_SPILLS turned off. In that case, you know that any queries that spill will not go overboard with their memory consumption.

 

Limits on Query Size and Complexity

There are hardcoded limits on the maximum size and complexity of queries. Currently, the maximum number of expressions in a query is 2000. You might exceed the limits with large or deeply nested queries produced by business intelligence tools or other query generators.

If you have the ability to customize such queries or the query generation logic that produces them, replace sequences of repetitive expressions with single operators such as IN or BETWEEN that can represent multiple values or ranges. For example, instead of a large number of OR clauses:

WHERE val = 1 OR val = 2 OR val = 6 OR val = 100 ...

use a single IN clause:

WHERE val IN (1,2,6,100,...)

 

Scalability Considerations for File Handle Caching

 

One scalability aspect that affects heavily loaded clusters is the load on the metadata layer from looking up the details as each file is opened. On HDFS, that can lead to increased load on the NameNode, and on S3, this can lead to an excessive number of S3 metadata requests. For example, a query that does a full table scan on a partitioned table may need to read thousands of partitions, each partition containing multiple data files. Accessing each column of a Parquet file also involves a separate "open" call, further increasing the load on the NameNode. High NameNode overhead can add startup time (that is, increase latency) to Impala queries, and reduce overall throughput for non-Impala workloads that also require accessing HDFS files.

You can reduce the number of calls made to your file system's metadata layer by enabling the file handle caching feature. Data files that are accessed by different queries, or even multiple times within the same query, can be accessed without a new "open" call and without fetching the file details multiple times.

 

Impala supports file handle caching for the following file systems:

HDFS in Impala 2.10 and higher

In Impala 3.2 and higher, file handle caching also applies to remote HDFS file handles. This is controlled by the cache_remote_file_handles flag for an impalad. It is recommended that you use the default value of true as this caching prevents your NameNode from overloading when your cluster has many remote HDFS reads.

S3 in Impala 3.3 and higher

The cache_s3_file_handles impalad flag controls the S3 file handle caching. The feature is enabled by default with the flag set to true.

The feature is enabled by default with 20,000 file handles to be cached. To change the value, set the configuration option max_cached_file_handles to a non-zero value for each impalad daemon. From the initial default value of 20000, adjust upward if NameNode request load is still significant, or downward if it is more important to reduce the extra memory usage on each host. Each cache entry consumes 6 KB, meaning that caching 20,000 file handles requires up to 120 MB on each Impala executor. The exact memory usage varies depending on how many file handles have actually been cached; memory is freed as file handles are evicted from the cache.

If a manual operation moves a file to the trashcan while the file handle is cached, Impala still accesses the contents of that file. This is a change from prior behavior. Previously, accessing a file that was in the trashcan would cause an error. This behavior only applies to non-Impala methods of removing files, not the Impala mechanisms such as TRUNCATE TABLE or DROP TABLE.

If files are removed, replaced, or appended by operations outside of Impala, the way to bring the file information up to date is to run the REFRESH statement on the table.

File handle cache entries are evicted as the cache fills up, or based on a timeout period when they have not been accessed for some time.

To evaluate the effectiveness of file handle caching for a particular workload, issue the PROFILE statement in impala-shell or examine query profiles in the Impala Web UI. Look for the ratio of CachedFileHandlesHitCount (ideally, should be high) to CachedFileHandlesMissCount (ideally, should be low). Before starting any evaluation, run several representative queries to "warm up" the cache because the first time each data file is accessed is always recorded as a cache miss.

To see metrics about file handle caching for each impalad instance, examine the following fields on the /metrics page in the Impala Web UI:

impala-server.io.mgr.cached-file-handles-miss-count

impala-server.io.mgr.num-cached-file-handles

 

Understanding Impala Query Performance - EXPLAIN Plans and Query Profiles

 

To understand the high-level performance considerations for Impala queries, read the output of the EXPLAIN statement for the query. You can get the EXPLAIN plan without actually running the query itself.

For an overview of the physical performance characteristics for a query, issue the SUMMARY statement in impala-shell immediately after executing a query. This condensed information shows which phases of execution took the most time, and how the estimates for memory usage and number of rows at each phase compare to the actual values. To understand the detailed performance characteristics for a query, issue the PROFILE statement in impala-shell immediately after executing a query. This low-level information includes physical details about memory, CPU, I/O, and network usage, and thus is only available after the query is actually run.

 

 

Using the EXPLAIN Plan for Performance Tuning

The EXPLAIN statement gives you an outline of the logical steps that a query will perform, such as how the work will be distributed among the nodes and how intermediate results will be combined to produce the final result set. You can see these details before actually running the query. You can use this information to check that the query will not operate in some very unexpected or inefficient way.

[impalad-host:21000] > explain select count(*) from customer_address;
+----------------------------------------------------------+
| Explain String                                           |
+----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=42.00MB VCores=1 |
|                                                          |
| 03:AGGREGATE [MERGE FINALIZE]                            |
| |  output: sum(count(*))                                 |
| |                                                        |
| 02:EXCHANGE [PARTITION=UNPARTITIONED]                    |
| |                                                        |
| 01:AGGREGATE                                             |
| |  output: count(*)                                      |
| |                                                        |
| 00:SCAN HDFS [default.customer_address]                  |
|    partitions=1/1 size=5.25MB                            |

 

Read the EXPLAIN plan from bottom to top:

The last part of the plan shows the low-level details such as the expected amount of data that will be read, where you can judge the effectiveness of your partitioning strategy and estimate how long it will take to scan a table based on total data size and the size of the cluster.

As you work your way up, next you see the operations that will be parallelized and performed on each Impala node.

At the higher levels, you see how data flows when intermediate result sets are combined and transmitted from one node to another.

See EXPLAIN_LEVEL Query Option for details about the EXPLAIN_LEVEL query option, which lets you customize how much detail to show in the EXPLAIN plan depending on whether you are doing high-level or low-level tuning, dealing with logical or physical aspects of the query.

The EXPLAIN plan is also printed at the beginning of the query profile report described in Using the Query Profile for Performance Tuning, for convenience in examining both the logical and physical aspects of the query side-by-side.

 

The amount of detail displayed in the EXPLAIN output is controlled by the EXPLAIN_LEVEL query option. You typically increase this setting from standard to extended (or from 1 to 2) when doublechecking the presence of table and column statistics during performance tuning, or when estimating query resource usage in conjunction with the resource management features in CDH 5.

Using the SUMMARY Report for Performance Tuning

The SUMMARY command within the impala-shell interpreter gives you an easy-to-digest overview of the timings for the different phases of execution for a query. Like the EXPLAIN plan, it is easy to see potential performance bottlenecks. Like the PROFILE output, it is available after the query is run and so displays actual timing numbers.

 

The SUMMARY report is also printed at the beginning of the query profile report described in Using the Query Profile for Performance Tuning, for convenience in examining high-level and low-level aspects of the query side-by-side.

 

For example, here is a query involving an aggregate function, on a single-node VM. The different stages of the query and their timings are shown (rolled up for all nodes), along with estimated and actual values used in planning the query. In this case, the AVG() function is computed for a subset of data on each node (stage 01) and then the aggregated results from all nodes are combined at the end (stage 03). You can see which stages took the most time, and whether any estimates were substantially different than the actual data distribution. (When examining the time values, be sure to consider the suffixes such as us for microseconds and ms for milliseconds, rather than just looking for the largest numbers.)