理解Stream API-Collector收集器
Collector<T, A, R>
是 Java Stream API 中用于自定义收集操作的核心接口。它允许开发者定义如何将流中的元素累积到一个可变容器中,并最终将容器转换为结果。Collector
的设计非常灵活,能够支持各种复杂的收集操作,例如分组、分区、连接字符串等。
接下来,我们将从 设计理念 和 底层原理 两个方面深入解析 Collector
。
一、设计理念
Collector
的设计目标是解耦流的处理逻辑与结果生成逻辑,同时提供足够的灵活性来支持各种收集操作。它的设计理念可以概括为以下几点:
可变容器(Accumulator):
- 收集操作需要一个可变容器来临时存储流中的元素。
- 容器类型可以是任意的(如
List
、Map
、StringBuilder
等)。
累积操作(Accumulation):
- 定义如何将流中的元素逐个添加到可变容器中。
结果转换(Finisher):
- 在流处理完成后,将可变容器转换为最终结果(如将
List
转换为不可变集合)。
- 在流处理完成后,将可变容器转换为最终结果(如将
并行支持(Combiner):
- 在并行流中,多个线程会独立处理数据块,最终需要将这些部分结果合并。
特性描述(Characteristics):
- 描述收集器的行为特性(如是否支持并行、是否有序等)。
二、Collector
接口定义
Collector
接口的定义如下:
public interface Collector<T, A, R> {
// 创建一个新的可变容器
Supplier<A> supplier();
// 将元素累积到容器中
BiConsumer<A, T> accumulator();
// 合并两个容器(用于并行流)
BinaryOperator<A> combiner();
// 将容器转换为最终结果
Function<A, R> finisher();
// 描述收集器的特性
Set<Characteristics> characteristics();
}
1. 核心方法解析
方法 | 作用 |
---|---|
Supplier<A> supplier() |
创建一个新的可变容器(如 ArrayList::new )。 |
BiConsumer<A, T> accumulator() |
将元素累积到容器中(如 List::add )。 |
BinaryOperator<A> combiner() |
合并两个容器(用于并行流,如 List::addAll )。 |
Function<A, R> finisher() |
将容器转换为最终结果(如将 List 转换为不可变集合)。 |
Set<Characteristics> characteristics() |
描述收集器的特性(如是否支持并行、是否有序等)。 |
2. 特性枚举(Characteristics)
Characteristics
是一个枚举类型,用于描述收集器的行为特性:
特性 | 含义 |
---|---|
CONCURRENT |
表示收集器支持并行操作,多个线程可以共享同一个容器。 |
UNORDERED |
表示收集器不保留元素的顺序。 |
IDENTITY_FINISH |
表示 finisher() 方法是一个恒等函数,可以省略。 |
三、底层原理
Collector
的底层实现依赖于 Stream API 的流水线机制。以下是其工作原理的详细解析:
1. 收集器的执行流程
当调用 collect(Collector)
方法时,Stream API 会按照以下步骤执行收集操作:
创建容器:
- 调用
supplier()
方法创建一个新的可变容器(如ArrayList
)。
- 调用
累积元素:
- 对流中的每个元素调用
accumulator()
方法,将其添加到容器中。
- 对流中的每个元素调用
合并部分结果(并行流):
- 在并行流中,多个线程会独立处理数据块,生成多个部分结果。
- 调用
combiner()
方法将这些部分结果合并。
转换结果:
- 调用
finisher()
方法将容器转换为最终结果(如将List
转换为Set
)。
- 调用
2. 并行流的支持
在并行流中,Collector
的执行流程如下:
分割数据:
- 使用
Spliterator
将数据源分割为多个子任务。
- 使用
并行累积:
- 每个子任务独立调用
supplier()
创建自己的容器,并使用accumulator()
累积元素。
- 每个子任务独立调用
合并结果:
- 使用
combiner()
方法将各个子任务的结果合并。
- 使用
3. 特性优化
Collector
的特性(Characteristics
)会影响其执行方式:
**
CONCURRENT
**:- 如果收集器是并发的,多个线程可以共享同一个容器,无需合并部分结果。
- 例如,
Collectors.toConcurrentMap()
是一个并发收集器。
**
IDENTITY_FINISH
**:- 如果
finisher()
是恒等函数,Stream API 会跳过此步骤,直接返回容器。
- 如果
四、自定义 Collector
示例
为了更好地理解 Collector
的工作原理,我们实现一个自定义收集器:将流中的字符串连接成一个用逗号分隔的字符串。
1. 自定义收集器实现
public class StringJoiningCollector implements Collector<String, StringBuilder, String> {
private final String delimiter;
public StringJoiningCollector(String delimiter) {
this.delimiter = delimiter;
}
@Override
public Supplier<StringBuilder> supplier() {
return StringBuilder::new; // 创建容器
}
@Override
public BiConsumer<StringBuilder, String> accumulator() {
return (sb, str) -> {
if (sb.length() > 0) {
sb.append(delimiter); // 添加分隔符
}
sb.append(str); // 添加元素
};
}
@Override
public BinaryOperator<StringBuilder> combiner() {
return (sb1, sb2) -> {
if (sb1.length() > 0 && sb2.length() > 0) {
sb1.append(delimiter); // 合并时添加分隔符
}
sb1.append(sb2); // 合并两个容器
return sb1;
};
}
@Override
public Function<StringBuilder, String> finisher() {
return StringBuilder::toString; // 转换为最终结果
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(); // 无特殊特性
}
}
2. 使用自定义收集器
List<String> words = Arrays.asList("Hello", "World", "Java", "Stream");
String result = words.stream().collect(new StringJoiningCollector(", "));
System.out.println(result); // 输出: Hello, World, Java, Stream
五、JDK 内置收集器
JDK 提供了许多内置的收集器(位于 java.util.stream.Collectors
类中),例如:
- **
toList()
**:将元素收集到List
中。 - **
toSet()
**:将元素收集到Set
中。 - **
joining()
**:将字符串连接成一个字符串。 - **
groupingBy()
**:按条件分组。 - **
partitioningBy()
**:按条件分区。
这些内置收集器都是基于 Collector
接口实现的。
六、总结
Collector<T, A, R>
的设计理念是通过解耦累积逻辑与结果生成逻辑,提供一种灵活且高效的方式来处理流中的元素。其底层原理基于以下核心机制:
- 可变容器:用于临时存储元素。
- 累积操作:将元素逐个添加到容器中。
- 结果转换:将容器转换为最终结果。
- 并行支持:通过
combiner()
方法合并部分结果。
通过理解 Collector
的设计和实现,开发者可以更好地使用内置收集器,甚至实现自定义的收集操作以满足特定需求。