PS做任务的网站,做网站开发 用什么,响应式科技公司网站模板,二次元wordpress博客主题前言 上节我们学了 lambda 表达式#xff0c;很快我就在 Flink 的学习中用到了#xff0c;我学的是 Java 版本的 Flink#xff0c;一开始会以为代码会很复杂#xff0c;但事实上 Flink 中很多地方都用到了 函数接口#xff0c;这也让我们在编写 Flink 程序的时候可以使用 …前言 上节我们学了 lambda 表达式很快我就在 Flink 的学习中用到了我学的是 Java 版本的 Flink一开始会以为代码会很复杂但事实上 Flink 中很多地方都用到了 函数接口这也让我们在编写 Flink 程序的时候可以使用 lambda 表达式非常地简洁地实现匿名函数。 今天再来学习一个新的特性Stream 流光是看名字就觉得和大数据能扯上关系我们的 Spark、Flink 当中不就都是这种流的概念嘛。
1、什么是 Strem 流 Stream 是 JDK1.8 中处理集合的关键抽象概念 Lambda 表达式 和 Stream 是JDK1.8 新增的函数式编程中最有亮点的特性了它可以指定你希望对集合进行操作可以执行非常复杂的查询过滤和映射等操作。使用 Stream API 对集合数据进行操作就类似于使用 SQL 来执行对 Java 集合运算和表达的高阶抽象。 Stream API 可以极大地提高 Java 程序员的生产力让程序员写出更加高效、干净、简洁的代码。那对我在大数据开发中更是如此。 这种风格将要处理的元素集合看做一种流流在管道中传输并且可以在管道的节点上进行处理比如过滤、排序、聚合等。
2、Stream 创建方式
1、创建串行 Stream
StreamUser userStream list.stream();
2、创建并行 Stream
StreamUser userStream list.parallelStream();
3、关闭
在Java中Stream只能被操作一次一旦你对其进行了一次操作比如forEach, collect等它就会被关闭再次操作就会报错stream has already been operated upon or closed。
3、Stream 将 List 转换为 Set
1、创建 List 集合
Stream 是通过集合创建出来的所以我们先创建一个集合而集合内我们需要存放实体所以先创建一个实体类 User
public class User {public String name;public int age;public User(){}public User(String name, int age) {this.name name;this.age age;}Overridepublic String toString() {return User{ name name \ , age age };}Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;User user (User) o;return age user.age Objects.equals(name, user.name);}Overridepublic int hashCode() {return Objects.hash(name, age);}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}
}创建集合
ListUser list new ArrayList();list.add(new User(燕双鹰,28));list.add(new User(李大喜,20));list.add(new User(李元芳, 30));list.add(new User(李元芳, 30));
重写 equals
注意这里我们对实体类的 equals 和 hashcode 方法进行了重写这在之前我是不会去重写的。重写和不重写的区别就是 重写后当两个实体对象的属性相同时equals 方法返回 true如果没有重写则 equals 返回 false。 和 equals 用于比较基本数据类型的值是否相等或者对象的引用地址是否相同。
int a 10;
int b 10;
System.out.println(ab); //trueString str1 hello;
String str2 hello;
System.out.println(str1str2); //false
equals 用于比较两个对象的内容是否相等。在Object类中默认的“equals()”实现使用“”操作符比较对象的引用。但是许多类如String、Integer等重写了“equals()”方法以便根据类的特定属性比较对象的内容。 set 去重底层原理
set 去重底层依赖于 map 集合实现放重复的 keymap 集合底层基于 equals 它先比较 key 的hashcode 是否相同相同情况下再调用 equals 方法判断是否真的相等。
所以一个实体类是否重写 equals 方法区别很大。
User u1 new User(s,1);User u2 new User(s,1);System.out.println(u1.equals(u2));
上面的代码如果我们以 User 对象作为 key如果我们的 User 没有重写 equals 方法那么返回的就是 false因为默认使用 引用地址不同如果重写了 equals 方法那么返回的就是 true因为使用重写后的 equals 两个对象属性相同返回 true。
注意对象的比较不会去比较 hashcode。
HashMapUser, String map new HashMap();map.put(u1,a);map.put(u2,b);System.out.println(map.get(u1).equals(map.get(u2)));
上面的代码如果我们没有重写 hashcode 的情况下那么返回的就是 true因为 map 的底层是通过 hashcode 来比较两个 key 是否相同如果重写了 hashcode 那么返回的就是 true。
2、List 转为 Set
public static void main(String[] args) {ListUser list new ArrayList();list.add(new User(燕双鹰,28));list.add(new User(李大喜,20));// 下面是两个属性相同的两个对象(我们已经重写了 equals 和 hashcode 方法)list.add(new User(李元芳, 30));list.add(new User(李元芳, 30));// todo 创建 Stream 的两种方式// 1. 串行流 stream() 单线程StreamUser stream list.stream();SetUser set stream.collect(Collectors.toSet());set.forEach(user-{System.out.println(user.toString());});
}
运行结果
User{name李元芳, age30}
User{name燕双鹰, age28}
User{name李大喜, age20}
可以看到重写 equals 和 hashcode 方法后虽然相同属性的两个对象的内存地址不同但也被去除重复了。
4、Stream 将 List 转为 Map
1、创建 List
注意List 转为 Map 的时候由于 Map 集合不允许存在重复的 key所以我们必须保证 list 集合中作为 key 字段的属性值唯一。
ListUser list new ArrayList();list.add(new User(燕双鹰,28));list.add(new User(李大喜,20));list.add(new User(李元芳, 30)); 2、List 转为 Map StreamUser stream list.stream();// list 集合是没有 key 的,所以不能直接转为 map 集合,需要指定 key(指定对象的某个字段作为key)MapString, User collect stream.collect(Collectors.toMap(new FunctionUser, String() { // 第一个参数 list中的类型,第二个参数是key类型: StringOverridepublic String apply(User user) {return user.getName();}}, new FunctionUser, User() { // 第一个参数 list中的类型,第二个参数是value类型: UserOverridepublic User apply(User user) {return user;}}));collect.forEach(new BiConsumerString, User() {Overridepublic void accept(String key, User user) {System.out.println(key,user.toString());}});
使用 lambda 表达式简化一下代码 // 用lambda表达式MapString, User collect stream.collect(Collectors.toMap(User::getName, user - user));collect.forEach((key,user)- System.out.println(key,user.toString()));运行结果
李元芳,User{name李元芳, age30}
李大喜,User{name李大喜, age20}
燕双鹰,User{name燕双鹰, age28}
5、Strem 通过 reduce 方法求和
1、简单求和
这里我们通过 Stream.of() 方法来进行数据的构造这让我想到了最近 Flink。
StreamInteger stream Stream.of(10, 50, 30, 10);OptionalInteger res stream.reduce(new BinaryOperatorInteger() {Overridepublic Integer apply(Integer integer, Integer integer2) {return integer integer2;}});
使用 lamda 表达式 OptionalInteger res stream.reduce(Integer::sum);
关于结果的打印我们后面讲到 Optional 类的时候再详细说一般直接
System.out.println(res.get());
2、对象属性和
我们构造一个 List 集合然后转为 Stream 调用 reduce 方法进行求和。
注意reduce 方法的返回结果类型必须和 Stream 的类型一致就像我们 Hadoop 中的 WordCount。
ListUser list new ArrayList();list.add(new User(燕双鹰,28));list.add(new User(李大喜,20));list.add(new User(李元芳, 30));StreamUser stream list.stream();OptionalUser sum stream.reduce(new BinaryOperatorUser() {Overridepublic User apply(User user1, User user2) {return new User(sum,user1.getAge() user2.getAge());}});System.out.println(sum.get());
lambda 表达式简化 StreamUser stream list.stream();OptionalUser sum stream.reduce(((user1, user2) - new User(sum, user1.getAge() user2.getAge())));System.out.println(sum.get());//78
6、Strem 查找集合最大值和最小值
1、创建集合
ListUser list new ArrayList();
list.add(new User(燕双鹰,28));
list.add(new User(李大喜,20));
list.add(new User(李元芳, 30));
2、查找最大 age 属性对象
OptionalUser max stream.max(new ComparatorUser() {Overridepublic int compare(User o1, User o2) {return o1.getAge() - o2.getAge();}});System.out.println(max.get());
lambda表达式简化
OptionalUser max stream.max((user1, user2) - user1.getAge() - user2.getAge());System.out.println(max.get()); //30
3、查找最小 age 属性对象
OptionalUser min stream.min((user1, user2) - user1.getAge() - user2.getAge());System.out.println(min.get()); //20
7、Stream 中 Match 用法
anyMatch 表示任意一个元素满足条件返回 true。
allMatch 表示所有元素满足条件才会返回 true。
noMatch 表示所有条件都不满足这个条件才会返回 true。
1、创建集合
ListUser list new ArrayList();list.add(new User(燕双鹰,28));list.add(new User(李大喜,20));list.add(new User(李元芳, 30));StreamUser stream list.stream();
2、anyMatch
判断集合中是否存在 age 属性大于 25 的对象。 boolean res stream.anyMatch(new PredicateUser() {Overridepublic boolean test(User user) {return user.getAge() 25;}});System.out.println(res); lambda 表达式
boolean res stream.anyMatch(user - user.getAge() 25);System.out.println(res); //true
3、allMatch
判断是否所有对象的 age属性都大于 30
boolean res stream.allMatch(user - user.getAge() 30);System.out.println(res); //false
4、noMatch
判断是否用户都不满足 name 为 “光头强 ”
boolean res stream.noneMatch(user - user.getName().equals(光头强));System.out.println(res); //true
8、Stream 过滤器
和我们 Flink 的 DataStream API 中的转换算子 filter 很像它们都是把 判断条件结果为 true 的数据留下false 则丢掉。
1、创建集合
ListUser list new ArrayList();list.add(new User(燕双鹰,28));list.add(new User(李大喜,20));list.add(new User(李元芳, 30));StreamUser stream list.stream();
2、过滤
留下 年龄25 岁的 User 对象。 StreamUser filterStream stream.filter(new PredicateUser() {Overridepublic boolean test(User user) { //为 true 则留下return user.getAge()25;}});filterStream.forEach(new ConsumerUser() {Overridepublic void accept(User user) {System.out.println(user);}});
运行结果
User{name燕双鹰, age28}
User{name李元芳, age30}
lambda表达式
StreamUser filterStream stream.filter(user - user.getAge() 25);filterStream.forEach(System.out::println);
9、Stream Limit 和 Skip
同样Stream 需要通过集合来创建。
ListUser list new ArrayList();
list.add(new User(燕双鹰,28));
list.add(new User(李大喜,20));
list.add(new User(李元芳, 30));
list.add(new User(熊大,15));
list.add(new User(熊二,14));
list.add(new User(光头强,20));
StreamUser stream list.stream();
1、取出前2条数据
// 在mysql中limit(start,end)需要传两个参数,但在这里只允许传入一个long类型的 maxSize// 取前2条数据stream.limit(2).forEach(System.out::println);
运行结果
User{name燕双鹰, age28}
User{name李大喜, age20} 2、取出第 [3,6) 条数据
注意这里的索引是从 0 开始的。
// 取 [3,6)条数据 想要分页从先 skip 再 limit
stream.skip(2).limit(3).forEach(System.out::println);
运行结果
User{name李元芳, age30}
User{name熊大, age15}
User{name熊二, age14}
10、Stream 排序 sorted
下面用到的数据。
ListUser list new ArrayList();
list.add(new User(燕双鹰,28));
list.add(new User(李大喜,20));
list.add(new User(李元芳, 30));
list.add(new User(熊大,15));
list.add(new User(熊二,14));
list.add(new User(光头强,20));
StreamUser stream list.stream();
1、直接排序
对于数值型的数据可以直接进行排序
StreamInteger integerStream Stream.of(1, 5, 8, 3, 7);
integerStream.sorted().forEach(System.out::println); //1 3 5 7 8
2、根据对象字段进行升序
stream.sorted(new ComparatorUser() {Overridepublic int compare(User o1, User o2) {return o1.getAge()-o2.getAge();}
}).forEach(System.out::println);
lambda 表达式
stream.sorted((o1, o2) - o1.getAge()-o2.getAge()).forEach(System.out::println);
运行结果
User{name熊二, age14}
User{name熊大, age15}
User{name李大喜, age20}
User{name光头强, age20}
User{name燕双鹰, age28}
User{name李元芳, age30}
JDK1.8 提供的函数接口
都在包 java.util.function 包下。
并行流
案例1 - 500亿次求和
1、使用单线程
Instant start Instant.now();long sum 0;for (long i 0; i 50000000000L; i) {sumi;}Instant end Instant.now();System.out.println(sum);System.out.println(500亿次求和花费时间: Duration.between(start,end).toMillis()ms); // 单线程 11s左右 多线程 6s左右2、使用并行流
Instant start Instant.now();LongStream longStream LongStream.rangeClosed(0,50000000000L);OptionalLong result longStream.parallel().reduce(new LongBinaryOperator() {Overridepublic long applyAsLong(long left, long right) {return left right;}});Instant end Instant.now();System.out.println(result.getAsLong());System.out.println(500亿次求和花费时间: Duration.between(start,end).toMillis()ms); // 单线程 11s左右 多线程 6s左右可以发现多线程明显要快很多。
总结 本次学习收获非常大函数接口的思想在 Flink 中随处可见的确这样一种能够使得代码简洁高效的技术在大数据开发中是非常重要的。