🌎 Preface.


Stream and Future are both core APIs of the Dart:async library and provide very good support for asynchrony.


I thought long and hard about how exactly I should introduce Stream to people. Because Stream is very useful, it is made for handling asynchronous events. And there are tons of scenarios in applications that require the use of asynchronous events, such as requesting the network, interacting with the user, etc., and they can’t be done synchronously.Stream can greatly help us deal with these problems. 😍


But for newbies, streams are indeed abstract enough that it takes people a very long time to understand it.

 So I will do my best to introduce you to Stream.

🏊🏻‍Stream

 🤔 What’s Stream


Stream is very distinctive but not very well understood, and I prefer to think of it as a factory or machine rather than as a stream in the literal sense.

 Let’s take a look at this machine what it has to offer:


  • It has a portal for things/commands (ANYTHING)
  •  This machine doesn’t know when the entrance is going to let something in.

  • The machines in the middle are able to produce or process, which should take some time
  •  He’s got an exit. There should be product coming out of there.
  •  We don’t know exactly when the product will come out of the export.


Throughout the whole process, time is an uncertainty. We can put something into the inlet of this machine at any time, and after putting it in, the machine processes it, but we don’t know how long it will take for it to finish processing. That’s why the exit needs to be watched, waiting for something to come out of the machine. The whole process is viewed asynchronously.

 ✈️ we transform the machine model into a Stream


  • This big machine is the StreamController, which is one of the ways to create streams.

  • StreamController has an entry point called sink

  • The sink can use the add method to put things in, and when it does, it doesn’t care anymore.

  • When something comes in from the sink, our machine starts working, empty empty empty.

  • StreamController has an outlet called stream

  • The machine will drop the product out of the outlet once it has been processed, but we don’t know when it will come out, so we need to use the listen method to always listen to this outlet.

  • And when more than one item has been put in, it doesn’t disrupt the order, it goes in first.


With this example, I believe we should all have a basic impression of the flow, so it won’t be difficult to explain what follows.

 🤔 How to use Stream

 Get the methods of the Stream:

  •  By means of the constructor
  • IO Stream

 stream has three constructor methods:


  • Stream.fromFuture: creates a new single-subscription stream from a Future, triggers a data or error when the future completes, and then closes the stream using the Down event.


  • Stream.fromFutures: creates a single-subscription stream from a set of Futures, each with its own data or error event, and the stream will close when the entire Futures is complete. If Futures is empty, the stream will be closed immediately.


  • Stream.fromIterable:Creates a single-subscription stream that gets its data from a collection.


Stream.fromItreable([1,2,3]);

 🧐 Methods for Listening to Streams


The most common way to listen to a stream is to listen, and the stream will notify the listener when an event is emitted.The listen method provides these types of trigger events:

  •  onData(Required): Triggered when data is received.
  •  onError: triggered when an Error is received.
  •  onDone: triggered at the end

  • unsubscribeOnError: if or not to unsubscribe when the first Error is encountered, default is false.


Using await for Stream Processing


In addition to listen, we can also use await for.

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}


This code will take a Stream and count the sum of all the events and return the result. await for handles each event as it comes in. We know that a stream receives events at an indeterminate time, so when should we exit the await for loop? The answer is when the stream completes or closes.


You can copy this code into DartPad and experiment a few more times.

😏 StreamController


It’s very simple if you want to create a new stream! 😀 Use StreamController, it provides you with very rich functionality, you are able to send data, handle errors and get results on streamController!

 
StreamController controller = StreamController();
controller.sink.add(123);
controller.sink.add("xyz");
controller.sink.add(Anything);

 
StreamController<int> numController = StreamController();
numController.sink.add(123);


A generalization defines what type of data we can push onto the stream. It can be any type!

 Let’s take a look at how to get the final result.

StreamController controller = StreamController();
 
StreamSubscription subscription =
controller.stream.listen((data)=>print("$data"));

controller.sink.add(123);

 Output: 123


You need to give a method to the listen function of stream, this method in-parameter (data) is the result that our StreamController produces after processing, we listen to the exit and get this result (data). You can use a lambda expression here, or any other function.


(Here I’m referring to listen as a function for ease of differentiation, and (data)=>print(data) as a method, which is actually one and the same thing.)


Generate stream via async*


If we have a series of events that we need to process, we might want to turn it into a stream, which we can do using async – yield* to generate a Stream.

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}


When the loop exits, the stream is done. We can experience this more deeply in conjunction with await for, which we talked about earlier.

 You can run my sample code directly here.

🤠Transforming an existing stream


Suppose you already have a stream that you can transform into a new stream through it. It’s very simple! Streams provide map(), where(), expand(), and take() methods to easily transform an existing stream into a new one.

where


If you want to filter out some unwanted events. For example, a guessing game, the user can enter numbers, when the input is correct, we make a certain response. And we have to filter out all the wrong answers, this time we can use where to filter out the unwanted numbers.

stream.where((event){...})


The where function receives an event, and whenever something flows into the where function from this stream, this is that event. We probably don’t need this event at all, but it must be passed in as a parameter.

take


If you want to control how many things this stream can pass at most. For example, to enter a password, we might want to let the user enter it up to four times, then we can use take to limit it.

stream.take(4);


The take function receives an int representing the maximum number of events that can pass through the take function. When the number of transfers reaches this number, the stream will be closed and no more transfers will be possible.

transform


If you need more control over the transformation, then use the transform() method. He needs to be used in conjunction with StreamTransformer. Let’s look at the following piece of guessing game and then I will explain it to you.

StreamController<int> controller = StreamController<int>();

final transformer = StreamTransformer<int,String>.fromHandlers(
    handleData:(value, sink){
  	if(value==100){
      sink.add("你猜对了");
    }
	else{ sink.addError(' ');
    }
  });
  
  controller.stream
            .transform(transformer)
            .listen(
                (data) => print(data),
                onError:(err) => print(err));
    
    controller.sink.add(23);
    //controller.sink.add(100);

 Output: No guesses yet. Try again.


StreamTransformer <S,T>is the inspector of our stream, he is responsible for receiving the information that the stream passes through and then processing it to return a new stream.


  • S represents the input type of the previous stream, we are inputting a number here, so it is int.

  • T represents the input type of the transformed stream, what we add in here is a string, so it’s String.

  • handleData takes a value and creates a new stream and exposes the sink where we can transform the stream.
  •  We can also addError in to tell the back that there’s a problem.


Then we listen to the stream after the transform, and when the transformed event flows out, we print the event, which is the data we just added to the sink. onError captures the err that we added in.

 🤨Types of Streams

 There are two types of streams


  • “Single-subscription” streams

  • “broadcast” streams Multi-subscription streams

“Single-subscription” streams


A single subscription stream is only allowed to have one listener for the entire lifecycle of the stream. it doesn’t generate events until it has a listener, and it stops sending events when it’s unlisted, even if you’re still in Sink.add more events.


Listening twice on a single subscription stream is not allowed, even after the first subscription has been canceled.


Single-subscription streaming is often used to stream larger contiguous blocks of data, such as file I/O.

StreamController controller = StreamController();

controller.stream.listen((data)=> print(data));
controller.stream.listen((data)=> print(data));

controller.sink.add(123);


Output: Bad state: Stream has already been listened to. A single subscription stream cannot have multiple listeners.

“Broadcast” streams


The broadcast stream allows any number of listeners and he can generate events whether there are listeners or not. So a listener who comes in midstream will not receive the previous message.


If multiple listeners want to listen to a single subscription stream, use asBroadcastStream to create a broadcast stream on top of a non-broadcast stream.


If a listener is added to the broadcast stream when an event is triggered, this listener will not receive the event that is currently being triggered. If the listener is canceled, the listener will immediately stop receiving events.


Streams in general are single-subscription streams. Broadcast streams inherited from a Stream must override isBroadcast to return true.

StreamController controller = StreamController();
 
Stream stream = controller.stream.asBroadcastStream();

stream.listen((data)=> print(data));
stream.listen((data)=> print(data));

controller.sink.add(123);

 Output: 123 123

By lzz

Leave a Reply

Your email address will not be published. Required fields are marked *