java.lang.util.stream包解析

Java 的 Stream 包是 Java 8 中引入的一项重要特性,它提供了一种声明式、函数式风格的数据流处理方法。Stream 允许我们以声明式的方式对数据源进行操作和转换,并将结果聚合到最终的集合或输出。

主要的接口和抽象类的定义有:

  • java.lang.AutoCloseable 自动关闭接口
    • java.util.stream.BaseStream 基础流定义接口
      • java.util.stream.Stream 流定义接口
        • java.util.stream.AbstractPipeline 流通道抽象
          • java.util.stream.ReferencePipeline 引用通道抽象
            • java.util.stream.ReferencePipeline.Head 初始通道实现
            • java.util.stream.ReferencePipeline.StatelessOp 无状态通道实现
            • java.util.stream.ReferencePipeline.StatefulOp 有状态通道实现
  • java.util.function.Consumer 消费者接口
    • java.util.stream.Sink 通道消费链定义接口
      • java.util.stream.Sink.ChainedReference 通道消费链抽象实现
  • java.util.Iterator 迭代器接口
  • java.util.Spliterator 分离器接口
  • java.util.stream.Collector 收集器接口

基础接口

java.lang.AutoCloseable

java.lang.AutoCloseable 是 Java 中的一个接口,它声明了一个单一的方法 close()。该接口是在 Java 7 中引入的,用于支持 try-with-resources 语句。

AutoCloseable 接口设计用来表示那些在使用后需要关闭的资源。当一个类实现了 AutoCloseable 接口时,它表示这个类的实例可以被自动关闭。在 try-with-resources 语句中,你可以使用一个或多个资源,这些资源必须实现 AutoCloseable 接口。当 try-with-resources 语句结束时,Java 虚拟机会自动调用这些资源的 close() 方法,以便在使用完资源后进行关闭操作,而无需显式地编写关闭代码。

例如:

try (BufferedReader reader = new BufferedReader(new FileReader("file.txt"))) {
    String line;
    while ((line = reader.readLine()) != null) {
        System.out.println(line);
    }
} catch (IOException e) {
    // 处理异常
}

在这个例子中,BufferedReader 实现了 AutoCloseable 接口,因此它可以在 try-with-resources 语句中使用。在 try-with-resources 语句结束时,会自动调用 BufferedReaderclose() 方法关闭文件流。这样可以确保资源被及时关闭,避免资源泄漏和其他相关问题。

java.util.function.Consumer

java.util.function.Consumer 是 Java 中的一个函数式接口,它定义了一个接收一个参数并且不返回任何结果的操作。它通常用于对参数进行一些处理,比如修改、打印、存储等,但不会产生任何返回值。Consumer 接口包含一个名为 accept 的抽象方法,该方法接收一个参数,代表需要被处理的对象。Consumer 接口在 Java 8 中引入,主要是为了支持 Lambda 表达式和函数式编程。

Consumer 接口的声明如下:

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

其中,T 是输入参数的类型。

Consumer 接口的使用场景包括但不限于:

  1. 集合操作: 在遍历集合时,可以使用 Consumer 对集合中的每个元素执行相同的操作。
  2. 函数式编程: 在函数式编程中,可以将 Consumer 作为方法的参数,传递给其他方法,以便在需要时执行特定的操作。
  3. I/O 操作: 在处理输入流或输出流时,可以使用 Consumer 来对流中的数据执行特定的操作。
  4. 回调函数: 在异步编程中,可以使用 Consumer 来定义回调函数,在异步任务完成后执行相应的操作。

以下是 Consumer 接口的一个简单示例,演示了如何使用它来打印列表中的每个元素:

import java.util.Arrays;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        List<String> languages = Arrays.asList("Java", "Python", "JavaScript", "C++");

        // 使用 Consumer 打印每个元素
        languages.forEach(language -> System.out.println(language));
    }
}

这个示例中,forEach 方法接收一个 Consumer 参数,用于处理列表中的每个元素。Lambda 表达式 language -> System.out.println(language) 实现了 Consumer 接口中的 accept 方法,将列表中的每个元素打印出来。

java.util.Iterator

java.util.Iterator 是 Java 编程语言中的一个接口,位于 java.util 包中。它提供了一种迭代集合元素的方式,允许逐个访问集合中的元素,而无需暴露集合的内部实现细节。

Iterator 接口定义了以下几种常用方法:

  1. boolean hasNext():检查迭代器是否还有下一个元素。
  2. E next():返回迭代器的下一个元素,并将迭代器的位置向前移动。
  3. void remove():从迭代器指向的集合中移除迭代器返回的上一个元素。这个方法不是在所有的迭代器中都可用,具体取决于集合的实现。

使用 Iterator 进行迭代的一般步骤如下:

  1. 调用集合对象的 iterator() 方法来获取一个 Iterator 对象。
  2. 使用 hasNext() 方法检查是否还有下一个元素。
  3. 使用 next() 方法获取下一个元素。
  4. (可选) 使用 remove() 方法移除当前元素。

下面是一个简单的示例,展示了如何使用 Iterator 遍历一个 List 集合:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class IteratorExample {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        list.add("Apple");
        list.add("Banana");
        list.add("Orange");

        // 获取Iterator对象
        Iterator<String> iterator = list.iterator();

        // 遍历集合
        while (iterator.hasNext()) {
            String element = iterator.next();
            System.out.println(element);
        }
    }
}

这段代码创建了一个包含几种水果的 List 对象,并通过调用 iterator() 方法获取了一个 Iterator 对象。然后,在一个 while 循环中,通过调用 hasNext()next() 方法逐个打印了集合中的元素。

Iterator 接口的使用有助于实现集合类的封装性和安全性,因为它允许在遍历集合时进行元素的安全访问和操作,而不会暴露底层集合的实现细节。

核心接口

java.util.stream.BaseStream

java.util.stream.BaseStream 是 Java 8 引入的流(Stream)框架中的一个基本接口。该接口位于 java.util.stream 包中,是流操作的基础。BaseStream 接口是泛型化的,它定义了流操作的通用方法,可以用于处理一系列元素。

BaseStream 接口继承自 AutoCloseable 接口,因此它可以在使用完毕后进行自动关闭资源。这对于一些需要处理 I/O 操作的流操作来说是非常有用的。

主要的子接口包括:

  1. Stream<T>:表示元素类型为 T 的对象流。
  2. IntStream:表示元素类型为 int 的原始流。
  3. LongStream:表示元素类型为 long 的原始流。
  4. DoubleStream:表示元素类型为 double 的原始流。

BaseStream 接口提供了一系列的中间操作(Intermediate Operations)和终端操作(Terminal Operations),用于进行流的转换和处理。中间操作返回一个新的流,而终端操作则触发实际的计算,产生一个最终的结果。

常见的中间操作包括 filtermapflatMap 等,而常见的终端操作包括 forEachcollectreduce 等。

以下是 BaseStream 接口的一些常用方法:

  1. void close():关闭流,释放相关的资源。
  2. boolean isParallel():判断流是否是并行流。
  3. BaseStream<T, S> sequential():返回一个顺序流。
  4. BaseStream<T, S> parallel():返回一个并行流。
  5. S unordered():返回一个无序流。
  6. Iterator<T> iterator():返回一个迭代器,用于迭代流中的元素。

BaseStream 接口的使用可以大大简化对集合数据的处理,特别是在并行处理大数据集时。通过流操作,可以更清晰、简洁地表达数据处理的逻辑。

java.util.stream.Stream

java.util.stream.Stream 是 Java 8 引入的一个新特性,它提供了一种在集合类(Collection)上进行复杂操作的高级抽象。它允许你以一种更为函数式的风格来处理集合中的元素,例如映射、过滤、排序、聚合等。

下面是一些 Stream 类的主要特性和用法:

  1. 函数式编程风格: Stream 提供了丰富的函数式编程方法,如 map()filter()reduce() 等,这些方法可以直接应用于集合中的元素,使得对集合的操作更为灵活和易读。

  2. 惰性求值: Stream 提供了惰性求值的特性,它们在处理集合时不会立即执行操作,而是等到需要结果时才执行。这种延迟执行的方式可以提高性能,尤其是当处理大型集合时。

  3. 流水线: Stream 支持将多个操作连接在一起,形成一个流水线。这种操作的串联可以减少中间数据结构的创建,从而提高效率。

  4. 并行处理: Stream 提供了并行处理集合元素的功能,可以通过 parallel() 方法将串行流转换为并行流,从而充分利用多核处理器的优势来加速处理过程。

  5. 可消费性: Stream 是一种一次性的数据结构,一旦遍历消费了其中的元素,就不能再次使用。这意味着 Stream 是不可变的,对其的操作不会影响到原始集合。

  6. 集成性: Stream 类提供了与其他类库的集成支持,例如与 Lambda 表达式、Optional 类型和函数式接口等的结合使用。

基本上,Stream 类提供了一种更加灵活和高效的集合操作方式,尤其适用于处理大规模数据集合。通过结合函数式编程的思想和惰性求值的特性,它使得在 Java 中进行集合处理变得更加简洁和优雅。

原理解析

上述章节都是一些基础介绍(来此GPT和网络),了解这些之后,继续深入研究其原理。

**Stream 的大致思路如下: **

  • 首次创建流: 会创建一个初始的Stream<T>,从代码中可知,这个初始的Stream<T>其实就是ReferencePipeline.Head<T>初始通道实现
    //此处仅展示部分代码
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
  • 中间操作: 会结合上一个通道,继续向下创建ReferencePipeline.StatelessOp<T>无状态通道实现或ReferencePipeline.StatefulOp<T>有状态通道实现,使其构成链式结构。因为其内部传递的都是Sink对象(真正的执行逻辑都在这个Sink对象中),所以做到了仅在最后才会真正的执行。
    //此处仅展示部分代码
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
    
                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    
    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
  • 终止操作: 仍然会结合上一个通道,唯一不一样的是这里是触发执行的位置,会执行到java.util.stream.TerminalOp,最终流也将关闭。
    //此处仅展示部分代码
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

java.util.stream.TerminalOp接口主要有四种抽象实现:

  • java.util.stream.ReduceOps.ReduceOp 减少操作 -> 主要用在收集器逻辑
  • java.util.stream.FindOps.FindOp 查找操作 -> 主要用取最大值最小值逻辑
  • java.util.stream.MatchOps.MatchOp 匹配操作 -> 用于判断元素是否符合匹配规则
  • java.util.stream.ForEachOps.ForEachOp 循环操作 -> .forEach操作

未完待续…


java.lang.util.stream包解析
https://blog.cikaros.top/doc/5d1b59b3.html
作者
Cikaros
发布于
2024年1月25日
许可协议