Streams are an update to the Java API that lets you manipulate collections of data in a declarative way (you express a query rather than code an ad hoc implementation for it). For now you can think of them as fancy iterators over a collection of data. In addition, streams can be processed in parallel transparently, without you having to write any multi threaded code
/* For example before java 8, we want to filter by weight and sort acs and get name*/
static void filterBefore8(List<Animal> list){
List<Animal> result = new ArrayList<>();
for (Animal animal: list){
if (animal.getWeight() > 100) {
result.add(animal);
}
}
result.sort(new Comparator<Animal>() {
@Override
public int compare(Animal o1, Animal o2) {
return Integer.compare(o1.getWeight(), o2.getWeight());
}
});
List<String> resultName = new ArrayList<>();
for(Animal animal : result){
resultName.add(animal.getName());
}
System.out.println(resultName.toString());
}
/* Such code in java 8 stream can be conside as */
static void filterAfter8(List<Animal> animals){
List<String> result = animals.stream()
.filter(a -> a.getWeight() > 100)
.sorted(Comparator.comparing(Animal::getWeight))
.map(Animal::getName)
.collect(Collectors.toList());
System.out.println(result.toString());
}
/* To exploit a multicore architecture and execute this code in parallel, you need only change
stream() to parallelStream() */
static void filterAfter8(List<Animal> animals){
List<String> result = animals.parallelStream()
.filter(a -> a.getWeight() > 100)
.sorted(Comparator.comparing(Animal::getWeight))
.map(Animal::getName)
.collect(Collectors.toList());
System.out.println(result.toString());
}
Stream is a sequence of element from a source (File, IO, network, or collection, ..) that support data processing operation.
- Characteristic of a stream:
- Sequence of data
- Source
- Data processing operation
- Pipe line
- Internal iterator.
-
One big different between stream and collections is when they are computed, collection is an in-memory data structure that hold values meanwhile a stream is an sequence of data coming from a source. Collection is eager computed while stream is computed on demand.
-
As an analogy, we can think of collection is a list of movie frame store in DVD, any frame is store upfront and user can select any frame, meanwhile a stream is a sort of video loading from internet, only a few frame is load (and a few more is buffer) at particular time.
-
Stream also have some characteristic such as:
- transversal only one.
Stream<Animal> stream = animals.stream(); stream.forEach(System.out::println); stream.forEach(System.out::println); Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed at java.base/java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at stream.Intro.main(Intro.java:60)
- External vs internal iteration
- Working with stream operation require three steps:
- Data source
- Intermediate operation (such as map, filter, distinct, sorted, limit ,... )
- Terminate operation (such as count, collect, forEach ,...)
long count = menu.stream()
.filter(d -> d.getCalories() > 300)
.distinct() // intermeidate operation
.limit(3) // intermediate operation
.count(); // terminate operation
List<Animal> list = animals.stream()
.filter(p -> p.getWeight() >= 800)
.distinct()
.skip(1)
.limit(3)
.collect(Collectors.toList());
List<String> arrayString = new ArrayList<>(Arrays.asList("TCB", "ACB"));
List<String> result = arrayString.stream()
.map(p -> p.split(""))
.flatMap(Arrays::stream)
.collect(Collectors.toList());
// 1, 2, 3, 5 --> 1, 4, 9, 25
List<Integer> arrayInt = new ArrayList<>(Arrays.asList(1, 2, 3, 5));
List<Integer> resultInt = arrayInt.stream()
.map(p -> p*p)
.collect(Collectors.toList());
System.out.println(resultInt.toString());
// 1, 2, 3, 5 & 4, 5, 6 --> [[1, 4], [1, 5], [1, 6], [2, 4], [2, 5], [2, 6], [3, 4], [3, 5], [3, 6]]
List<Integer> firstNum = new ArrayList<>(Arrays.asList(1, 2, 3));
List<Integer> secNum = new ArrayList<>(Arrays.asList(4, 5, 6));
List<ArrayList<Integer>> mergeNum = firstNum.stream()
.flatMap(i -> secNum.stream()
.map(j -> new ArrayList<>(Arrays.asList(i,j))))
.collect(Collectors.toList());
System.out.println(mergeNum.toString());
// 1, 2, 3, 5 & 4, 5, 6 --> [[1, 5], [2, 4], [2, 6], [3, 5]]
mergeNum = firstNum.stream()
.flatMap(i -> secNum.stream()
.filter(j -> ((i + j) % 2) == 0)
.map(j -> new ArrayList<>(Arrays.asList(i,j))))
.collect(Collectors.toList());
System.out.println(mergeNum.toString());
if(animals.stream().anyMatch(p -> p.getWeight() == 900)) {
System.out.println("There is animal weight 900");
}
if (animals.stream().allMatch(p -> p.getWeight() > 0)){
System.out.println("All animal weight is positive");
}
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6));
int sum = list.stream().reduce(0, Integer::sum);
int mul = list.stream().reduce(1, (a, b) -> a*b);
int total = list.stream().map(p -> 1).reduce(Integer::sum).orElse(-1);
- Stateful Stream: reduce, min, max, ...
- Stateless Stream: map, filter, ...
public static void main(String[] args) throws InterruptedException {
// Find all transactions in the year 2021 and sort them by value (small to high).
list.stream()
.filter(t -> t.dealTime.getYear() == 2021)
.sorted(Comparator.comparing(Transaction::getAmount))
.forEach(System.out::println);
// What are all the unique cities where the traders work?
list.stream()
.map(Transaction::getOriginLocation)
.distinct()
.forEach(System.out::println);
// Find all traders from NY and sort them by name.
list.stream()
.filter(t -> t.originLocation.equals("NY"))
.sorted(Comparator.comparing(Transaction::getTraderName))
.forEach(System.out::println);
// Return a string of all traders’ names sorted alphabetically.
list.stream()
.map(Transaction::getTraderName)
.sorted((String::compareTo))
.forEach(System.out::println);
// Are any traders based in Milan?
if (list.stream().anyMatch(p -> p.getOriginLocation().equals("MI"))){
System.out.println("There is trader in Milan");
} else {
System.out.println("No");
}
// What’s the highest value of all the transactions?
Integer result = list.stream()
.map(Transaction::getAmount)
.reduce(Integer::max)
.orElse(-1);
System.out.println(result);
// Find the transaction with the smallest value
Transaction transaction = list.stream()
.reduce(list.get(0), (t1, t2) -> {
if (t1.getAmount() < t2.getAmount()){
return t1;
}
return t2;
});
System.out.println(transaction);
Stream<List<Integer>> stream = IntStream.range(1, 100)
.boxed()
.flatMap(a -> IntStream.range(a, 100)
.filter(b -> Math.sqrt(a*a + b*b) % 1 == 0)
.mapToObj(b -> Arrays.asList(a, b, (int) Math.sqrt(a*a + b*b))));
stream.limit(10).forEach(System.out::println);
}
// build stream from values
Stream<String> stringStream = Stream.of("TCB", "ACB", "HDB");
stringStream.map(String::toLowerCase).forEach(System.out::println);
// build stream from array
int[] arr = new int[]{1, 2, 3};
IntStream integerStream = Arrays.stream(arr);
integerStream.forEach(System.out::println);
// build stream from file
try(Stream<String> lines = Files.lines(Paths.get("data.txt"), Charset.defaultCharset())){
lines.forEach(System.out::println);
} catch (IOException exception){
exception.printStackTrace();
}
// Summarize
Comparator<Transaction> comparator = Comparator.comparing(Transaction::getAmount);
Optional<Transaction> mostAmount = list.stream().collect(Collectors.maxBy(comparator));
double average = list.stream().collect(Collectors.averagingDouble(Transaction::getAmount));
String allTradeName = list.stream().map(Transaction::getTraderName).collect(Collectors.joining());
Map<String, List<Transaction>> transByCities = list.stream()
.collect(Collectors.groupingBy(Transaction::getOriginLocation));
Map<Boolean, List<Transaction>> transByAmount = list.stream().collect(Collectors.partitioningBy(p ->
p.getAmount() > 5000));
- Best practice of using parallel:
- Avoid using iterate
- Associativity: expect results to come without following any order
- Lambda expressions should be stateless
- Avoid the modification of the streams' elements
- Lambda expressions should not emit side effects
- Only use parallelism when the number of elements is very huge
- Watch out for boxing. Automatic boxing and unboxing operations can dramatically hurt performance.
- Some operation such as findAny is better for parallel than findFirst
public static void testParallel(){
System.out.println("Available Processors: " + Runtime.getRuntime().availableProcessors());
System.out.println("Int parallel :" + IntStream.iterate(1, i -> i + 1)
.limit(100)
.parallel()
.reduce(0, Integer::sum));
}
public static void testPerformance(){
Function<Integer, Integer> forLoop = integer -> {
int sum = 0;
for (int i = 0; i < integer; i++){
sum += i;
}
return sum;
};
Function<Integer, Integer> streamSequential = integer -> IntStream.iterate(1, n -> n + 1)
.limit(integer)
.reduce(0, Integer::sum);
Function<Integer, Integer> streamParallel = integer -> IntStream.iterate(1, n -> n + 1)
.limit(integer)
.parallel()
.reduce(0, Integer::sum);
Function<Integer, Integer> streamParallelOpt = integer -> IntStream.rangeClosed(1, integer)
.parallel()
.reduce(0, Integer::sum);
performance(forLoop, 100_000_000, "For loop");
performance(streamSequential, 100_000_000, "Stream Sequential");
performance(streamParallel, 100_000_000, "Stream Parallel");
performance(streamParallelOpt, 100_000_000, "Stream Parallel Opt");
}
public static void performance(Function<Integer, Integer> func, int value, String testName){
System.out.println(testName + " start...");
long fastest = Long.MAX_VALUE;
for (int i = 0 ; i < 5; i++){
long start = System.nanoTime();
func.apply(value);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Time [" + i + "], duration :" + duration + " ms" );
if (fastest > duration) fastest = duration;
}
System.out.println("Fastest run :" + fastest + " ms");
System.out.println("-------------");
}