Apache Kafka Configuration| Kafka Settings

We will go through Apache Kafka Configuration settings which you need to do as part of setting up Apache Kafka.

Four Kafka Component Settings

  • Broker Settings
  • Producer Settings
  • Consumer Settings
  • Zookeeper Configuration with Kafka

Apache Kafka Configuration

1. Broker Settings

The Overall Performance of Kafka depends on the following Sub-settings.

1. Connection Settings

Zooker session timeout default value is 30000 ms(milliseconds).

zookeeper.session.timeout.ms

Within this time the server sends Zookeeper heartbeat signals, if it fails to do so the server is considered to be dead.
Do not set this value too low, otherwise it will falsely consider a server dead, also do not set this value too high, otherwise zookeeper will take too long to determine a truly dead server.

2. Topic Settings

For each topic, Kafka maintains a structured commit log with one or more partitions. In general, the more the partitions in a Kafka cluster, more parallel consumers can be added, resulting in higher throughput.

Important Topic Properties

auto.create.topics.enable

With this property set to true nonexistent topics get created automatically with a default replication factor

default.replication.factor

For high availability production systems, you should set this value to at least 3.

num.partitions

For automatically created topics it’s default value is 1. You can change based on requirements.

delete.topic.enable

This allows users to delete a topic from Kafka using the admin tool, if this property is turned off then Deleting a topic through the admin tool will have no effect.
By default this feature is turned off (set to false).

3. Log Settings

log.roll.hours

The maximum time, in hours, before a new log segment is rolled out. The default value is 168 hours (seven days).

This setting controls the period of time after which Kafka will force the log to roll, even if the segment file is not full. This ensures that the retention process is able to delete or compact old data.

log.retention.hours

The number of hours to keep a log file before deleting it. The default value is 168 hours (seven days).

log.dirs

A comma-separated list of directories in which log data is kept. If you have multiple disks, list all directories under each disk.

log.retention.bytes

The amount of data to retain in the log for each topic partition. By default, log size is unlimited.

If log.retention.hours and log.retention.bytes are both set, Kafka deletes a segment when either limit is exceeded.

log.segment.bytes

The log for a topic partition is stored as a directory of segment files. This setting controls the maximum size of a segment file before a new segment is rolled over in the log. The default is 1 GB.

Log Flush Management

log.flush.interval.messages

Specifies the number of messages to accumulate on a log partition before Kafka forces a flush of data to disk.

log.flush.scheduler.interval.ms 

Specifies the amount of time (in milliseconds) after which Kafka checks to see if a log needs to be flushed to disk.

log.segment.bytes

Specifies the size of the log file. Kafka flushes the log file to disk whenever a log file reaches its maximum size.

log.roll.hours 

Specifies the maximum length of time before a new log segment is rolled out (in hours); this value is secondary to log.roll.ms. Kafka flushes the log file to disk whenever a log file reaches this time limit.

4. Compacting Settings

log.cleaner.dedupe.buffer.size

Specifies total memory used for log de-duplication across all cleaner threads.

By default, 128 MB of buffer is allocated.

log.cleaner.io.buffer.size

Specifies the total memory used for log cleaner I/O buffers across all cleaner threads. By default, 512 KB of buffer is allocated.

5. General Broker Settings

auto.leader.rebalance.enable

Enables automatic leader balancing, default is enabled.

unclean.leader.election.enable

This property allows you to specify a preference of availability or durability. This is an important setting: If availability is more important than avoiding data loss, ensure that this property is set to true. If preventing data loss is more important than availability, set this property to false.

This property is set to true by default, which favors availability.

controlled.shutdown.enable

Enables controlled shutdown of the server. The default is enabled.

min.insync.replicas

When a producer sets acks to “all”, min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception.

You should set min.insync.replicas to 2 for replication factor equal to 3.

message.max.bytes

Specifies the maximum size of message that the server can receive.

broker.rack

The rack awareness feature distributes replicas of a partition across different racks.

2. Producer Settings

The lifecycle of a request from producer to broker involves several configuration settings:

The producer polls for a batch of messages from the batch queue, one batch per partition. A batch is ready when one of the following is true:a. batch.size is reached. Note: Larger batches typically have better compression ratios and higher throughput, but they have higher latency.

a. batch.size is reached. Note: Larger batches typically have better compression ratios and higher throughput, but they have higher latency.

b. linger.ms (time-based batching threshold) is reached. Note: There is no simple guideline for setting linger.ms values; you should test settings on specific use cases. For small events (100 bytes or less), this setting does not appear to have much impact.

c. Another batch to the same broker is ready.

d. The producer calls flush() or close().

Some additional settings

max.in.flight.requests.per.connection (pipelining)
compression.type

It accepts standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’), as well as ‘uncompressed’ (the default, equivalent to no compression).

acks

The acks setting specifies acknowledgments that the producer requires the leader to receive before considering a request complete. This setting defines the durability level for the producer.
if Acks = 0; it means High Throughput , Low latency
if Acks = 1; it means medium Throughput , medium latency
if Acks = -1; it means low Throughput , High latency

flush() : which makes all buffered records immediately available to send (even if linger.ms is greater than 0).

3. Consumer Settings

One basic guideline for consumer performance is to keep the number of consumer threads equal to the partition count.

4. Zookeeper Configuration with Kafka

Some recommendations :

  • Do not run ZooKeeper on a server where Kafka is running.
  • Make sure you allocate sufficient JVM memory. A good starting point is 4GB.
  • To monitor the ZooKeeper instance, use JMX metrics.
  • When using ZooKeeper with Kafka you should dedicate ZooKeeper to Kafka, and not use ZooKeeper for any other components.

Summary

In this article we saw some configuration settings of Kafka components for it to run with high performance, we saw some recommended settings and what each setting means.
I Hope you liked the article !


Fail-fast and Fail-safe iterators in java

Iterators are used to iterate or traverse over collections in java. The iterators can be fail-fast and fail-safe.
Fail-fast iterators throw exceptions at runtime (called the ConcurrentModificationException) if the collection is being modified while iterating over it.
Fail-safe iterators are those which do not throw exception if being modified while iterating because they work over the clone of the collection rather than working on the actual collection.

Iterator which iterate on HashMap, ArrayList classes are some examples of fail-fast Iterator.
Iterator which iterate on ConcurrentHashMap, CopyOnWriteArrayList classes are examples of fail-safe Iterator.

Understand with an Example !

1. Example of Fail-fast iterator

import java.util.ArrayList;
import java.util.Iterator;

public class FailFastIteratorTest {

public static void main(String[] args) {
	ArrayList<String> list = new ArrayList<>();
	list.add("john1");
	list.add("john2");
	list.add("john3");
	list.add("john4");
	list.add("john5");

	System.out.println(list);

	Iterator<String> iterator = list.iterator();
	while (iterator.hasNext()){
		if(iterator.next().equals("john3"))
		{
			list.remove("john3");
		}
	}
	System.out.println(list);
 }
}

OUTPUT

[john1, john2, john3, john4, john5]
Exception in thread "main" java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
	at java.util.ArrayList$Itr.next(ArrayList.java:859)

Problem

The problem here is that you are trying to remove an element “john3” from the list while iterating over it therefore this will lead to ConcurrentModificationException.

How to overcome it ?

So instead of list.remove(“john3”) use iterator.remove() method in the condition

while (iterator.hasNext()){
	if(iterator.next().equals("john3"))
	{
		iterator.remove();
	}
}

OUTPUT

[john1, john2, john3, john4, john5]
[john1, john2, john4, john5]

Using iterator’s remove() method you will not get any such Exception.

Important Fact :

  1. If you wish to use iterators for traversing a collection and then you intend to remove elements from that collection while iteration, then prefer using iterators remove() method as it doesn’t throw ConcurrentModificationException.
  2. There is another way to avoid exception, use fail-safe iterator collections instead, we will look into it below.

2. Example of Fail-safe iterator

As we read earlier, the fail-safe iterators do not throw concurrent modification exception because they work or iterate on the clone of the collection and not
on the actual.

But, There are a couple of drawbacks:

  1. Fail safe iterators does not always guarantee updated data, after the iterator is made if any modification is done on the collection will not reflect in the iterator as it works on the clone.
  2. One overhead is of memory and time used in creating a clone of collection to work on.

I. Example of CopyOnWriteArrayList :

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class FailSafeIteratorTest {

    public static void main(String[] args) {

        List<String> list = new CopyOnWriteArrayList<>();
        list.add("john1");
        list.add("john2");
        list.add("john3");
        list.add("john4");
        list.add("john5");

        System.out.println(list);

        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            if (iterator.next().equals("john3")) {
                list.remove("john3");
            }
        }
        System.out.println(list);
    }
}

OUTPUT

[john1, john2, john3, john4, john5]
[john1, john2, john4, john5]

Here you will observe, on using fail safe iterator on CopyOnWriteArrayList it will not throw any exception.

II. Example of ConcurrentHashMap :

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

public class FailSafeIteratorTest {

    public static void main(String[] args) {

        ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<>();
        map.put(1,"one");
        map.put(2,"two");
        map.put(3,"three");
        map.put(4,"four");

        System.out.println(map);

        Iterator<Integer> iterator = map.keySet().iterator();
        while (iterator.hasNext()) {
            int key = iterator.next();
            System.out.println(key + " : " + map.get(key));
            map.put(5,"five");
        }
        System.out.println(map);
    }
}

OUTPUT

{1=one, 2=two, 3=three, 4=four}
1 : one
2 : two
3 : three
4 : four
5 : five
{1=one, 2=two, 3=three, 4=four, 5=five}

Here you will observe, on using fail safe iterator on ConcurrentHashMap it will not throw any exception.

3. Difference between Fail-fast and Fail-safe iterators

Fail-fast IteratorFail-safe Iterator
This kind of iterator throws ConcurrentModificationException when iterating over a collection.This kind of iterator does not throw Exception when iterating over a collection.
It iterates on the original collection.It iterates over the copy of original collection and not on the actual collection.
There is no extra memory and time overhead like fail safe iterators as they operate on actual collection.It has an overhead of extra memory and time as it works over the copy of actual collection.
Examples are HashMap, ArrayList.Examples are ConcurrentHashMap, CopyOnWriteArrayList

4. How Iterators work internally ?

Initially in the internal implementation a variable called expectedModCount is equal to modCount which is zero.

int expectedModCount = modCount;

If there is any change done in the collection, the modCOunt will change and then an exception is thrown, by calling a method checkForComodification().

final void checkForComodification() {
     if (modCount != expectedModCount)
	throw new ConcurrentModificationException();
}

Summary

In this article we learnt about Fail-fast and Fail-safe iterators, we also learnt them with examples, it’s differences and internal working.
Hope you liked the article !


REST Webservices naming guidelines

In REST there is no strict naming rule conventions but there are certain guidelines which ensure that our webservices api URLs are easy to read and understand.

We are free to implement it in any way we want to.

Naming guidelines of REST WebServices

Just remember Nothing is right or wrong here in naming REST URIs as these are not the rules but best practices.

1. Simple Names

As such nothing specific, but the naming of REST API should be self-describing and simple.

Example

/users/12345
/api?type=user&id=12345

2. Use Nouns not verbs

Your URI should refer to a thing (a noun) and not an action (verb).

Example

http://www.programmertoday.com/rest/v1/users
http://www.programmertoday.com/rest/v1/users/{userId}
http://www.programmertoday.com/rest/v1/users/{userId}/orders
http://www.programmertoday.com/rest/v1/users/{userId}/orders/{order-id}

http://www.programmertoday.com/rest/v1/orders/{order-id}

http://www.programmertoday.com/rest/v1/products/{product-id}

we must avoid using uri names like below:
http://www.programmertoday.com/rest/v1/getUsers
http://www.programmertoday.com/rest/v1/getProducts

3. Try using Plural Nouns

Though name can be in singular noun as well but well recommended is to use plural nouns.

Example

/employees which represents all employees
/employees/{emp-id} represents a particular employee

4. Use lower case

Example

Recommended
/employees/{emp-id} 

Not-Recommended
/Employees/{emp-id}

5. Use Hyphens(-) to sepreate two words

Example

/employees/{emp-id}

6. Do not use Underscores(_)

Because Some search engines and browsers may concaternate the two words seperated by underscore.

Example

Not-Recommended
/employees/{emp_id} 

7. Filters like pagination, Limited search, sorting

For requirements like these, we should not go and create seperate resources Rather we should use query params to achieve the same.

Example

http://domain/rest/v1/orders?orders=Online

http://domain/rest/v1/products?category=toys&within=100

8. Do not name APIs with HTTP methods for CRUD operations

Example

HTTP GET : http://domain/rest/v1/products/{product-id}

HTTP DELETE : http://domain/rest/v1/products/{product-id} 

Not-recommended
http://domain/rest/v1/getorders
http://domain/rest/v1/remove/products
http://domain/rest/v1/delete/products
http://domain/rest/v1/updateproduct

9. Versioning

Always version your API. Versioning helps you iterate faster and prevents invalid requests from hitting updated endpoints. It also helps smooth over any major API version transitions as you can continue to offer old API versions for a period of time.

Example

HTTP GET : http://domain/rest/v1/products/{product-id}

HTTP GET : http://domain/rest/v2/products/{product-id}

Summary

In this tutorial, we learnt about the REST Webservices naming guidelines which one should follow not as a rule but as a good practice.
I hope you liked it !


REST WebServices Statelessness

The Statelessness means not having any state, REST means Representational State Transfer, which boils down to the same things that the server hosting REST does not store any state about the client session on its side.

Each request from the client to server must contain all of the information necessary to understand the request, and it cannot take advantage of any stored context on the server. Session state is therefore kept entirely on the client. Client is responsible for storing and handling all application state related information on client side.

To maintain statelessness, one should not store authorization details of client on the server, each request to the server should contain all the required details. As each request is considered to be a new request.

Advantages of REST being Stateless :

  1. It has drastically reduced the code – by reducing the code of server side snychronization logic.
  2. East to scale up, as there are no sessions to maintain so any server can handle multiple requests.
  3. Easy to cache as well.
  4. Server knows about each request by each client as the client carries all required information with each request, and can be tracked.
  5. Web services need not maintain the client’s previous interactions.

Summary

In this tutorial we learnt about the statelessness of REST apis and its advantages.
I hope you liked it !