The distinct()
stream operation compares the stream’s elements using Object.equals()
. That is, for any set of stream elements that are all equals()
to each other, the distinct()
operation will let just one of them through. However, sometimes you want the notion of “distinct” to be based on some property or other value derived from the stream element, but not the value itself. You could use map()
to map the stream element into some derived value and use distinct()
on those, but the result would be a stream of those derived values, not the original stream element.
It would be nice if there were some construct like
distinct(Function<T,U> keyExtractor)
that would call keyExtractor
to derive the values that are compared for uniqueness, but there isn’t. However, it’s not too difficult to write your own.
The first insight is that you can think of the distinct()
operation as a stateful filter. It’s like a filter()
operation, which takes a predicate that determines whether to let the element though. It’s stateful because whether it lets an element through is determined by what elements it has seen previously.
This state needs to be maintained somewhere. Internally, the distinct()
operation keeps a Set
that contains elements that have been seen previously, but it’s buried inside the operation and we can’t get to it from application code. But we could write something similar ourselves. The usual way to maintain state in Java is to create a class that has fields in which the state is maintained. We need a predicate, and that predicate could be a method on that class. This will work, but it’s rather cumbersome.
The second insight is that lambdas can capture local variables from their enclosing lexical environment. These local variables cannot be mutated, but if they are references to mutable objects, those objects can be mutated. Thus we can write a higher-order function whose local variables contain references to the state objects, and we can have our higher-order function return a lambda that captures those locals and does its processing based on the captured, mutable state.
This function will want to take a keyExtractor
function that’s used to derive a value from each stream element. Conceptually we’ll want a Set
to keep track of values we’ve seen already. However, in case our stream is run in parallel, we’ll want some thread-safe data structure. A ConcurrentHashMap
is a simple way to do this, with each existing key representing membership in the set, and the value being a dummy object such as the empty string. (That’s how many Set
implementations in the JDK work already.) Ideally we’d want to use an existing object as the dummy value and not create one each time. The empty string literal is used many times in the core JDK classes, so it’s certainly already in the constant pool.
Here’s what the code looks like:
public static <T> Predicate<T> distinctByKey( Function<? super T,Object> keyExtractor) { Map<Object,String> seen = new ConcurrentHashMap<>(); return t -> seen.put(keyExtractor.apply(t), "") == null; }
This is a bit subtle. This is intended to be used within a filter()
operation, so we’re returning a lambda that’s a predicate that computes a boolean based on whether it’s seen the value before. This value is derived from the stream element by calling the key extractor function. The put()
method returns the previous value in the map, or null if there was no value. That’s the case we’re interested in, so if it returns null, we want the predicate to return true just for this first time. Subsequent times it will return non-null, so we return false those times, so the filter operation won’t pass through the element in those cases. I had used putIfAbsent()
at first, since it has first-time-only semantics, but it turns out to be unnecessary, and using put()
makes the code a bit shorter.
Here’s how it’s used. Suppose we have a Book
class that has fields for title and author, and the obvious constructor and getters, and we have a list of books that we want to process:
List<Book> list = Arrays.asList( new Book("This Side of Paradise", "F. Scott Fitzgerald"), new Book("The Beautiful and Damned", "F. Scott Fitzgerald"), new Book("The Great Gatsby", "F. Scott Fitzgerald"), new Book("Tender is the Night", "F. Scott Fitzgerald"), new Book("The Sound and the Fury", "William Faulkner"), new Book("Absalom, Absalom!", "William Faulkner"), new Book("Intruder in the Dust", "William Faulkner"), new Book("The Sun Also Rises", "Ernest Hemingway"), new Book("A Farewell to Arms", "Ernest Hemingway"), new Book("The Old Man and the Sea", "Ernest Hemingway"), new Book("For Whom the Bell Tolls", "Ernest Hemingway"), new Book("A Moveable Feast", "Ernest Hemingway") );
If we wanted one book from each author, we could do this:
list.stream() .filter(distinctByKey(Book::getAuthor)) .forEach(System.out::println);
The output from running this is:
Book[This Side of Paradise,F. Scott Fitzgerald] Book[The Sound and the Fury,William Faulkner] Book[The Sun Also Rises,Ernest Hemingway]
Since this is a sequential stream, the first book by each author is the one that ends up in the output. If we were to run this stream in parallel, it would still work “correctly” in that one book from each author would be output. However, which book is output would differ from run to run.
It takes a bit of tinkering to use higher-order functions to write stateful stream operations. You can see the evolution of my thinking by looking at my answer to a Stackoverflow question on this topic. I started out writing a class, but after chipping away at it a bit I realized a class was no longer necessary and a higher-order function could be used instead. This is a powerful technique that can be used to write all kinds of stateful stream operations. You just have to make sure to be careful they’re thread-safe.
Why
Map seen = new ConcurrentHashMap();
and not
Set seen = Collections.newSetFromMap(new ConcurrentHashMap());
?
Oh yes, good point. Collections.newSetFromMap() would probably work, and would obviate the need to provide a dummy value (empty string) in the put() call.
This kind of distinctByKey filters can also be simulated using the stateful groupingBy collector, e.g.
list.stream()
.collect(groupingBy(Book::getAuthor)).values().stream()
.map(bs->bs.get(0))
.forEach(System.out::println);
Yes, this can be a useful technique in some circumstances. Note however, that this will build an intermediate map consisting of all entries from the stream, including duplicates, potentially consuming more temporary memory. It will also build the completely build the intermediate map before releasing the first result downstream. These might or might not be issues for the application, however.
Hello !
Great piece of code, very elegant. However I’m wondering about Garbage collection. Will the “seen” Set be garbage collected at some point, or will the reference stay alive forever ?
It’s not very clear for me how garbage collection works with higher order functions …
Thank you
Good question! The “seen” set is allocated in within the distinctByKey() method and a reference to it is then captured by the lambda expression in the return statement. That lambda expression (a Predicate) is a function, to be sure, but it’s implemented as an object that has a field containing any captured values — including the reference to the “seen” set. The “seen” set will be kept alive as long as anyone has a reference to the returned Predicate. If the reference to the Predicate is stored somewhere long-lived (e.g., a static variable) it’ll hang onto the “seen” set indefinitely. But if all references to the Predicate are dropped, it’ll get garbage collected, as will the “seen” set. In the example code, the Predicate is used only by the stream pipeline. When the stream completes, all the stream operations, the Predicate, and the “seen” set will become eligible for garbage collection.
Very clear explanation, thanks a lot !
[…] From the link: […]