
TLDR: Streams are nothing more than a convenient way of handling callbacks, and they can cause a zombie outbreak if you are not careful, because callbacks ARE a type of reference to the creating object.
The Unnecessarily Long Introduction
Between RxJS clocking in at 15 million weekly (yes, seriously, weekly) downloads and RxDart exploding in popularity, it looks like streams, as a paradigm, are here to stay. But what the hell are streams? Are they a special language syntax? A magical data structures that just somehow “flow” data? Or, are they just a convenient way of handling async callbacks with a ton of functionality (and complexity) built on top of a very simple concept? Let’s take a look.
Initially, I started to learn about streams for a Bluetooth project I was writing in Flutter. In case you have been living under a rock, or simply don’t keep up with all the latest mobile development trends (Inconceivable!), Flutter is this really cool framework that Google “graciously” released a few years ago. In reality Google was tired of people releasing all the cool apps for iOS first, and they decided to make a cross platform toolkit so that at the very least, if they can’t get devs to code for Android first, they would get them to code for both platforms at the same time. Well, what can I say, it worked, they got me.
My current project is an app for a bluetooth enabled battery management system (BMS) for large scale lithium batteries. The app pulls data from the BMS about the state of the battery, and allows the end user to set settings on the BMS. Overall, it is a pretty simple 2-3 screen app with only complex bits residing in the BMS connection negotiation code. We needed to support iOS and Android out of the box, so this app was perfect for Flutter!
Flutter doesn’t support Bluetooth out of the box, but Buffalo Inc, the people who make portable hard drives and a bunch of other stuff, were gracious enough to release Flutter Blue, an open source bluetooth library. The library is based around RxDart, and handles everything via streams. So, I had to roll up my sleeves and finally figure out streams properly.
Finally, Streams!!!
So, what are streams and why do you need them? Well, for that bit of info I’ll let you read or watch any number of explanations from countless sources on the internet. You see, you have to get properly confused and desperate before you’ll become inspired to continue reading this blog post 😉 But, in short, streams are pitched as this magical alternative to callbacks. People analogize streams to data flows or to factory assembly lines. They are neither.
Streams are literally just another way to handle good old callbacks. It took me banging my head on my desk (literally), to understand that little bit of insight. Now I am going to try to convey that insight to you by reinventing the wheel and making our very own stream using nothing more magical than classes and callbacks. My hope is that by showing you what streams look like under the hood you will be able to think about them in a more accurate way, and would avoid the zombie object issues I had to deal with ZombieLand style.
(What, you don’t think your project has zombies? Have you checked? Read on, you may find that you have a zombie infestation as well, and you just can’t see it.)
I know JS is taking over the world, but I’ll take this moment to also introduce you to Dart. Dart is the language used to program Flutter apps in. It’s also a pretty cool little language in it’s own right, and it compiles to vanilla JS, as well as native code! It’s strongly typed, very strongly typed, not like that promiscuous TypeScript, and it has great out of the box support for Streams. If you are unfamiliar with Dart, just read the code examples below while omitting all of the type annotations and it will basically read like JS.
To run any of the code examples simply use DartPad and run it directly in the browser (remember, dart compiles to JS, after all).
Normal Dart Streams
So, first, let’s take a look at a very simple example of using the streams built right into Dart. Read through the code quickly and see if you can predict what the output will be.
import 'dart:async';
void main() {
MyPrettyLittleZombie zombie = MyPrettyLittleZombie();
Future.delayed(const Duration(seconds: 10), () {
print("Attempting to garbage collect the zombie by nulling its reference.");
zombie = null;
});
}
class MyPrettyLittleZombie {
MyPrettyLittleZombie() {
ns = NormalStream();
sub = ns.stream.listen((counter) {
print("Zombie said $counter! It's still coming straight for us!");
});
Future.delayed(const Duration(seconds: 15), () {
print("OK, that didn't work. Let's try unsubscribing.");
sub.cancel();
});
}
NormalStream ns;
StreamSubscription sub;
}
class NormalStream {
NormalStream() {
Future.delayed(const Duration(milliseconds: 1000), counterIncrement);
}
StreamController<int> streamController = StreamController<int>.broadcast();
Stream<int> get stream => streamController.stream;
// Counter Stuff
int counter = 0;
void counterIncrement() {
if (streamController.hasListener) {
counter++;
streamController.sink.add(counter);
Future.delayed(const Duration(milliseconds: 1000), counterIncrement);
}
}
}
Ok, so what do you think? Do you know what the output will look like? Let’s take a look at the output, and then we will go quickly through the code section by section. Here is the output from above:
Zombie said 1! It's still coming straight for us!
Zombie said 2! It's still coming straight for us!
Zombie said 3! It's still coming straight for us!
Zombie said 4! It's still coming straight for us!
Zombie said 5! It's still coming straight for us!
Zombie said 6! It's still coming straight for us!
Zombie said 7! It's still coming straight for us!
Zombie said 8! It's still coming straight for us!
Zombie said 9! It's still coming straight for us!
Attempting to garbage collect the zombie by nulling its reference.
Zombie said 10! It's still coming straight for us!
Zombie said 11! It's still coming straight for us!
Zombie said 12! It's still coming straight for us!
Zombie said 13! It's still coming straight for us!
Zombie said 14! It's still coming straight for us!
OK, that didn't work. Let's try unsubscribing.
Did you get it right? Are you a little surprised that the stream capped on counting even after we set zombie = null
? Like JS and many other garbage collected languages, Dart should have double tapped that zombie
after we nulled its one and only reference. But was that really the only reference to our zombie
? Or, perhaps, just maybe, another reference to our zombie
is created when it runs the listen
function? Is listen
just a really shitty analogy to what we are actually doing? All questions will be answered, but first let’s look closely at the code.
Dart starts running all code from the main
function, just like C and many other languages. If you are more familiar with JS, just pretend that everything in main
is at the top of the file.
void main() {
MyPrettyLittleZombie zombie = MyPrettyLittleZombie();
Future.delayed(const Duration(seconds: 10), () {
print("Attempting to garbage collect the zombie by nulling its reference.");
zombie = null;
});
}
In the above section we instantiate the MyPrettyLittleZombie class into a zombie
object. (If this sentence is confusing, go and watch some YouTube videos on object orientated programming. It’s boring, and I am not covering it.)
The next bit is simple, though, through the pure genius of Google engineers, it’s made in the most complex way possible. Basically 10 seconds from when the code runs I want it to do something. Think setTimeout
in JS.
The bit that I want it to do is to set the zombie
object to null, which should (theoretically) cause it to get garbage collected. But it doesn’t, as you can see in the output. Why? That’s what we aim to find out. Let’s take a look at what our zombie
looks like.
class MyPrettyLittleZombie {
MyPrettyLittleZombie() {
ns = NormalStream();
sub = ns.stream.listen((counter) {
print("Zombie said $counter! It's still comming stright for us!");
});
Future.delayed(const Duration(seconds: 15), () {
print("OK, that didn't work. Let's try unsubscribing.");
sub.cancel();
});
}
NormalStream ns;
StreamSubscription sub;
}
Our zombie
, as most zombies, is a pretty simple little creature, it only has 2 properties, one real function, and one safety measure (you know, just in case our garbage collector fails to double tap the bastard). At the top, we assign ns
to a new instance of NormalStream
. NormalStream
is just an implementation of a stream using Dart’s built in Stream classes. Upon instantiation, NormalStream
starts broadcasting an incrementing integer, pretty simple.
After instantiating NormalStream
we “subscribe” to updates from our instance of NormalStream
by using the listen
function for the stream. The analogy is that as soon as we hear the stream emit (send out) a new value, we will run some code on that value, in our case just print it to the console. Pretty simple. The sub
variable stores a reference to our subscription. It’s like a receipt, you’ll need it to unsubscribe.
Finally, we add a safety that will unsubscribe our zombie
from NormalStream
updates after 15 seconds by using the sub
object we talked about above.
Pretty simple stuff, no? Now let’s look at our final bit of code, the NormalStream
class.
class NormalStream {
NormalStream() {
Future.delayed(const Duration(milliseconds: 1000), counterIncrement);
}
StreamController<int> streamController = StreamController<int>.broadcast();
Stream<int> get stream => streamController.stream;
// Counter Stuff
int counter = 0;
void counterIncrement() {
if (streamController.hasListener) {
counter++;
streamController.sink.add(counter);
Future.delayed(const Duration(milliseconds: 1000), counterIncrement);
}
}
}
If you are familiar with streams, nothing here should be very surprising. Basically, we setup a StreamController
and a Stream
, and then add some logic to emit the incrementing integers once a second. The StreamController
handles all of the additional helper logic around the Stream
object.
To send a new value to the stream we run the streamController.sink.add(counter)
function with counter
as the integer we want to emit. Why is it called a sink
? Because developers have a penchant for having really terrible analogies for simple concepts. In any case, by adding a new value to the sink, that value will be emitted on the stream.
Finally, the counterIncrement
function calls itself over and over again. Yes, I know, it’s tempting to sound smart by calling this recursion, but recursion it is not.
So that’s it! We looked at all the code. Do you now know why our zombie
refuses to die after we null
its reference? No? Ok, let’s keep going. In the hopes of giving you the same “aha” moment I had earlier this month, let me now rebuild the same code with our own “Stream” class, instead.
Reinventing the Wheel, or rather, the Stream
To understand a bit better what’s happening under the hood of a Stream object let’s make our own, very simple, implementation. Yes, we will have Streams without any libraries or built in classes. In fact, you could easily implement the code below using vanilla JS (again, dart is compiled to vanilla JS). So, first, like before, let’s take a look at the code.
import 'dart:async';
void main() {
MyPrettyLittleZombie zombie = MyPrettyLittleZombie();
Future.delayed(const Duration(seconds: 10), () {
print("Attempting to garbage collect the zombie by nulling its reference.");
zombie = null;
});
}
class MyPrettyLittleZombie {
MyPrettyLittleZombie() {
ns = ReinventedStream();
sub = ns.listen((counter) {
print("Zombie said $counter! It's still coming straight for us!");
return;
});
Future.delayed(const Duration(seconds: 15), () {
print("OK, that didn't work. Let's try unsubscribing.");
sub.cancel();
});
}
ReinventedStream ns;
ReinventedStreamSubscription sub;
}
class ReinventedStream {
ReinventedStream() {
Future.delayed(const Duration(milliseconds: 1000), counterIncrement);
}
// Stream Sub and Unsub Methods and Vars
Map<int, void Function(int)> subCallBacks = Map<int, void Function(int)>();
int subCounter = 0;
ReinventedStreamSubscription listen(void Function(int) callback(int data)) {
final ReinventedStreamSubscription sub = ReinventedStreamSubscription(subCounter, this);
subCallBacks[subCounter] = callback;
subCounter++;
return sub;
}
void unlisten(int sub) {
subCallBacks.remove(sub);
}
// Stream Sink Methods and Vars
int last;
void add(int newData) {
last = newData;
subCallBacks.forEach((index, callBack) {
callBack(newData);
});
}
// Counter Stuff
int counter = 0;
void counterIncrement() {
if (subCallBacks.isNotEmpty) {
counter++;
add(counter);
Future.delayed(const Duration(milliseconds: 1000), counterIncrement);
}
}
}
class ReinventedStreamSubscription {
ReinventedStreamSubscription(this.pos, this.stream);
final int pos;
ReinventedStream stream;
void cancel() {
if (stream != null) {
stream.unlisten(pos);
stream = null;
}
}
}
Because this post is already getting ridiculously long, I’ll skip copy and pasting the console output here, but take my word for it that it looks identical to the output from the normal stream implementation above. Or, if you are not the trusting type, just run it in DartPad.
The first bit of this code is essentially identical to what we looked at above. In the MyPrettyLittleZombie
class we use ns.listen
instead of ns.stream.listen
, but that’s only because I was too lazy to split my Stream Implementation into 15 different classes. But generally, the main
function and MyPrettyLittleZombie
class are identical. The difference are in the ReinventedStream
and ReinventedStreamSubscription
classes. So let’s take a look at those. What do these magical streams actually look like under the hood?
First, let’s take a look at the ReinventedStream
class. I’ll skip over periodic number emitting logic, as it’s identical to the normal implementation above, and will instead concentrate on the “stream” bits.
Map<int, void Function(int)> subCallBacks = Map<int, void Function(int)>();
int subCounter = 0;
Here we set up 2 properties. The subCallBacks
property is a Map that has an integer key index and stores a void Function(int)
callback. If we were making a proper generic Stream object we would have used a generic type, of course, but for this example a hard coded int
type will be sufficient.
The other property, subCounter
, is used to generate unique index values for the subCallBacks
map. (BTW, a Map is similar to named array, HashMap, or a Named List, in other languages. Every language, well, almost every language, has one, and every language has to invent a new name for the same damn concept.)
ReinventedStreamSubscription listen(void Function(int) callback(int data)) {
final ReinventedStreamSubscription sub = ReinventedStreamSubscription(subCounter, this);
subCallBacks[subCounter] = callback;
subCounter++;
return sub;
}
void unlisten(int sub) {
subCallBacks.remove(sub);
}
In this bit of code we define our listen
and unlisten
functions. Again, terrible names! They should be called addCallBack
and removeCallBack
, but I am using the nomenclature of the magical streams here.
The listen function stores the call back passed to it from the outside, in our case, from our zombie
object, into the subCallBacks
map. The unlisten
function removes the callback at a given index. Again, pretty simple so far. The next bit is where the “magic” happens.
int last;
void add(int newData) {
last = newData;
subCallBacks.forEach((index, callBack) {
callBack(newData);
});
}
Yep, you made it! Now you should be having that “aha” moment I was talking about! This is the function that “sinks” or adds new values to the stream. Upon adding a new value it calls all of the callbacks one at a time, and sends to them the new value.
As you may know, callbacks run in the context they are created in, NOT the context they are called in. The latter would make them pretty useless in anything other than globally scoped programs. So, to properly run the callback that we set up inside the zombie
object, the instance of the ReinventedStream
has to maintain a reference to the zombie
object! And, of course, since we instantiated the ReinventedStream
inside the zombie
object, they are now cross linked, and will not get garbage collected until we delete the callback stored in the subCallBacks
map of the ReinventedStream
instance by unsubscribing from the stream.
As an aside, this is an implementation of a broadcast stream. A regular stream can only have one listener. A broadcast stream can have many listeners, but you can’t guarantee which listener’s callback will run first. You can see how you can easily use this code to create your own stream library with priority settings, where you can assign some callback to run first, and then have groups of callbacks. For instance, group A can run after the first callback, group B next, and so on. I can think of a few UI applications where that would be very useful, but I have digressed.
Finally, let’s just take a quick look at the ReinventedStreamSubscription
class, just for completeness, after all, we have figured out our object zombification issue now, no?
class ReinventedStreamSubscription {
ReinventedStreamSubscription(this.pos, this.stream);
final int pos;
ReinventedStream stream;
void cancel() {
if (stream != null) {
stream.unlisten(pos);
stream = null;
}
}
}
This class is instantiated by the listen
function of the ReinventedStream
class and returned to the subscriber so that the subscriber could cancel the subscription. It really should be called a subscription receipt. The code is pretty self-explanatory, the cancel
function reaches out to the instance of its creator and runs unlisten
on a given callback, as identified by its integer index.
Conclusion
Wow, you are still reading? Well, I am tired of typing, so let’s keep this short. Streams are awesome, but you MUST remember to unsubscribe from all the streams before disposing of an object or you will suffer from a zombie uprising of an unstoppable magnitude (or just a memory leak). Happy hunting!