Dart Streams
Stream is a mechanism in Dart for handling continuous sequences of asynchronous events.
If Future is a one-time asynchronous result, then Stream is a multiple, continuous asynchronous data stream.
This chapter introduces the concept of Stream, await for listening, StreamController creation, and the difference between single-subscription and broadcast streams.
* * *
## Stream and Event Sequences
Stream is like a conveyor belt, where data arrives one by one over time.
You don't need to wait for all data at once; instead, you process each piece as it arrives.
Typical use cases for Stream:
| Scenario | Example |
| --- | --- |
| User input events | Button clicks, mouse movements, keyboard input |
| File reading | Reading large files line by line |
| Network data | WebSocket messages, real-time API |
| Timers | Timer events triggered every second |
| State changes | State management streams in Flutter |
## Example
Basic usage of Stream β listen for monitoring:
```dart
void main(){
// Create a Stream: emit a number every 1 second
var stream = Stream.periodic(
Duration(seconds:1),
(count)=> count +1,// count starts from 0
);
print('Start monitoring Stream...');
// listen subscribes to Stream
var subscription = stream.listen(
(data){
// Called whenever new data arrives
print('TUTORIAL received data: $data');
},
onError:(error){
// Called when Stream encounters an error
print('Error: $error');
},
onDone:(){
// Called when Stream is closed
print('Stream closed');
},
);
// Cancel subscription after 5 seconds
Future.delayed(Duration(seconds:5),(){
subscription.cancel();
print('Subscription cancelled');
});
}
Start monitoring Stream... TUTORIAL received data: 1 TUTORIAL received data: 2 TUTORIAL received data: 3 TUTORIAL received data: 4 TUTORIAL received data: 5Subscription cancelled
listen() returns a StreamSubscription object, which you can use to control the subscription:
| Method | Purpose |
| --- | --- |
| subscription.pause() | Pause receiving data |
| subscription.resume() | Resume receiving data |
| subscription.cancel() | Cancel subscription |
| subscription.isPaused | Whether it is paused |
* * *
## await for Listening
In addition to the listen() callback method, you can also use await for loops to consume Stream.
await for makes Stream processing logic as clear as a regular for loop.
## Example
```dart
// Generate a finite data stream
Stream countStream(int max) async*{
for(int i =1; i <= max; i++){
await Future.delayed(Duration(milliseconds:300));
yield i;// yield emits data to Stream
}
}
Future main() async {
print('Start consuming Stream with await for...');
// await for: wait for each data to arrive, process one by one
await for(var value in countStream(5)){
print('TUTORIAL count: $value');
}
print('Stream consumption complete');
}
Start consuming Stream with await for... TUTORIAL count: 1 TUTORIAL count: 2 TUTORIAL count: 3 TUTORIAL count: 4 TUTORIAL count: 5Stream consumption complete
Characteristics of await for:
* Similar to regular for loop syntax, but each iteration waits for the next data to arrive
* When Stream closes, the loop ends automatically
* Can only be used in async functions
* To exit early, use break (same as regular for loop)
> await for is suitable for scenarios that require "sequential processing of all data", such as reading all lines of a file. listen() is suitable for scenarios that require "continuous response to events", such as button clicks. The two can replace each other, but each has its own strengths.
* * *
## Creating Stream with StreamController
StreamController is a tool for manually creating and controlling Stream.
You can add data, errors, or close it at any time.
## Example
```dart
import'dart:async';
// Implement a simple countdown timer using StreamController
class CountdownTimer {
final StreamController _controller = StreamController();
Timer? _timer;
int _remaining =0;
// Expose Stream for external subscription
Streamget tickStream => _controller.stream;
// Start countdown
void start(int seconds){
_remaining = seconds;
// Send initial value immediately
_controller.add(_remaining);
_timer = Timer.periodic(Duration(seconds:1),(timer){
_remaining--;
if(_remaining >0){
_controller.add(_remaining);// Send data
}else{
_controller.add(0);// Send final 0
_controller.close();// Close Stream
timer.cancel();
}
});
}
// Cancel countdown
void cancel(){
_timer?.cancel();
_controller.addError('Countdown cancelled');// Send error
_controller.close();
}
// Release resources
void dispose(){
_controller.close();
}
}
Future main() async {
var timer = CountdownTimer();
// Subscribe to countdown events
timer.tickStream.listen(
(remaining){
print('TUTORIAL countdown: $remaining seconds');
},
onError:(error){
print('Error: $error');
},
onDone:(){
print('Countdown finished!');
},
);
timer.start(5);
// Wait for countdown to complete
await Future.delayed(Duration(seconds:6));
timer.dispose();
}
TUTORIAL countdown: 5 seconds TUTORIAL countdown: 4 seconds TUTORIAL countdown: 3 seconds TUTORIAL countdown: 2 seconds TUTORIAL countdown: 1 seconds TUTORIAL countdown: 0 secondsCountdown finished!
### async* Generator Function
If you only need to simply generate a data sequence, async* is more convenient than StreamController.
## Example
```dart
// async* marks this as an asynchronous generator function
// Return type must be Stream
Stream readLinesAsync() async*{
var lines =['First line','Second line','Third line','TUTORIAL'];
for(var line in lines){
await Future.delayed(Duration(milliseconds:500));
yield line;// yield emits data to Stream
}
// Stream automatically closes when function ends
}
// Asynchronous generator with error handling
Stream generateNumbersWithError() async*{
for(int i =1; i <=5; i++){
await Future.delayed(Duration(milliseconds:300));
if(i ==3){
throw Exception('Number 3 has an error!');
}
yield i;
}
}
Future main() async {
print('Read line by line:');
await for(var line in readLinesAsync()){
print(' $line');
}
print('n Generator with error:');
try{
await for(var num in generateNumbersWithError()){
print(' Number: $num');
}
}catch(e){
print(' Caught error: $e');
}
}
Read line by line: First line Second line Third line TUTORIAL Generator with error: Number: 1 Number: 2 Caught error: Exception: Number 3 has an error!
* * *
## Single-subscription vs Broadcast Stream
Dart's Stream is divided into two types: Single-subscription Stream and Broadcast Stream.
### Single-subscription Stream
This is the default Stream type.
It can only be subscribed to by one listener, suitable for scenarios of "consuming from start to finish once".
## Example
```dart
void main(){
// Single-subscription stream (default)
var stream = Stream.fromIterable([1,2,3]);
// First subscription: OK
stream.listen((data)=> print('Subscriber1: $data'));
// Second subscription: Error! Single-subscription stream cannot be subscribed multiple times
// stream.listen((data) => print('Subscriber2: $data')); // Runtime error
}
### Broadcast Stream
Broadcast Stream allows multiple listeners to subscribe simultaneously, suitable for event broadcasting scenarios.
## Example
```dart
import'dart:async';
void main(){
// Create a broadcast stream
var controller = StreamController.broadcast();
// Multiple subscribers can listen at the same time
controller.stream.listen(
(data)=> print('TUTORIAL SubscriberA: received $data'),
);
controller.stream.listen(
(data)=> print('TUTORIAL SubscriberB: received $data'),
);
// Send data, both subscribers will receive
controller.add(1);
controller.add(2);
controller
YouTip