RxJS Subjects in Depth
This article was originally published on Bits and Pieces by Giancarlo Buomprisco
In this article, I want to explore the topic of RxJS’s implementation of Subjects, a utility that is increasingly getting awareness and love from the community.
In the past, I have used Subjects in a variety of ways, but sometimes not fully understanding what they are internally and what are the main differences with Observables.
This is what the article will cover:
What is a Subject?
Multicasting and Unicasting
Other types of Subject: AsyncSubject, ReplaySubject, and BehaviorSubject
What is a Subject?
Let’s start with a simple question: what is a Subject?
According to Rx’s website:
A Subject is a special type of Observable that allows values to be multicasted to many Observers.
Subjects are like EventEmitters.
If this is unclear, hang on, by the end of the article you’ll have a much clearer understanding of what a Subject is and all the way you can make use of them.
The definition stated by the Rx documentation initially struck me: in fact, I always thought of Subjects as purely a way to both pull and push values using streams. It turns out, I didn’t really know them full well even after having used them daily for about 5 years.
Tip: Easily reuse Angular/React/Vue components across your projects with Bit
Use Bit to share and reuse JS modules and React/Angular/Vue components across different projects. Collaborate over shared components as a team to build apps faster together. Let Bit do the heavy lifting so you can easily publish, install and update your individual components without any overhead. Click here to learn more.
Subject
Subject is a class that internally extends Observable. A Subject is both an Observable and an Observer that allows values to be multicasted to many Observers, unlike Observables, where each subscriber owns an independent execution of the Observable.
That means:
you can subscribe to a Subject to pull values from its stream
you can feed values to the stream by calling the method next()
you can even pass a Subject as an Observer to an Observable: as mentioned above, a Subject is also an Observer, and as a such, it implements the methods next, error and complete
Let’s see a quick example:
const subject$ = new Subject();
// Pull values
subject$.subscribe(
console.log,
null,
() => console.log('Complete!')
);
// Push values
subject$.next('Hello World');
// Use Subject as an Observer
const numbers$ = of(1, 2, 3);
numbers$.subscribe(subject$);
/* Output below */
// Hello Word
// 1
// 2
// 3
// Complete!
The Internals of a Subject
Internally, every Subject maintains a registry (as an array) of observers. This is how internally a Subject works, in a nutshell:
every time a new observer subscribes, the Subject will store the observer in the observers' array
when a new item is emitted (i.e. the method next() was called), the Subject will loop through the observers and emit the same value to each one of them (multicasting). The same will happen when it errors or completes
when a Subject completes, all the observers will be automatically unsubscribed
when a Subject is unsubscribed, instead, the subscriptions will still be alive. The observers’ array is nullified, but it doesn’t unsubscribe them. If you attempt to emit a value from an unsubscribed Subject, it will actually throw an error. The best course of action should be to complete your subjects when you need to dispose of them and their observers
when one of the observers is unsubscribed, it will then be removed from the registry
Multicasting
Passing a Subject as an Observer allows to convert the Observable’s behavior from unicast to multicast. Using a Subject is, indeed, the only way to make an Observable multicast, which means they will share the same execution with multiple Observers.
Wait, though: what does sharing execution actually mean? Let’s see two examples to understand the concept better.
Let’s use the observable interval as an example: we want to create an observable that emits every 1000 ms (1 second), and we want to share the execution with all the subscribers, regardless of when they subscribed.
const subject$ = new Subject<number>();
const observer = {
next: console.log
};
const observable$ = interval(1000);
// subscribe after 1 second
setTimeout(() => {
console.log("Subscribing first observer");
subject$.subscribe(observer);
}, 1000);
// subscribe after 2 seconds
setTimeout(() => {
console.log("Subscribing second observer");
subject$.subscribe(observer);
}, 2000);
// subscribe using subject$ as an observer
observable$.subscribe(subject$);
Let’s summarize the snippet above
we create a subject called subject$ and an observer which simply logs the current value after each emission
we create an observable that emits every 1 second (using interval)
we subscribe respectively after 1 and 2 seconds
finally, we use the subject as an observer and subscribe to the interval observable
Let’s see the output:
As you can see in the image above, even if the second observable subscribed after 1 second, the values emitted to the 2 observers are exactly the same. Indeed, they share the same source observable.
Another common example that shows the usefulness of multicasting is subscribing to an observable that executes an HTTP request, a scenario that happens often in frameworks such as Angular: by multicasting the observable, you can avoid executing multiple requests and share the execution with multiple subscribers, that will receive the same value.
AsyncSubject
I personally find AsyncSubject the least known type of Subject, simply because I never really needed it, or more likely I didn’t know I could have needed it.
In short, the AsyncSubject will:
emit only once it completes
emit only the latest value it received
const asyncSubject$ = new AsyncSubject();
asyncSubject$.next(1);
asyncSubject$.next(2);
asyncSubject$.next(3);
asyncSubject$.subscribe(console.log);
// ... nothing happening!
asyncSubject$.complete();
// 3
As you can see, even if we subscribed, nothing happened until we called the method complete.
ReplaySubject
Before introducing a ReplaySubject, let’s see a common situation where normal Subjects are used:
we create a subject
somewhere in our app, we start pushing values to the subject, but there’s no subscriber yet
at some point, the first observer subscribes
we expect the observer to emit the values (all of them? or only the last one?) that were previously pushed through the subject
… nothing happening! In fact, a Subject has no memory
const subject$ = new Subject();
// somewhere else in our app
subject.next(/* value */);
// somewhere in our app
subject$.subscribe(/* do something */);
// nothing happening
This is one of the situations when a ReplaySubject can help us: in fact, a Subject will record the values emitted and will push to the observer all the values emitted when a subscribed.
Let’s get back to the question above: does a ReplaySubject replay all the emissions or only the latest one?
Well, by default, the subject will replay all the items emitted, but we can provide an argument called bufferSize. This argument defines the number of emissions that the ReplaySubject should keep in its memory:
const subject$ = new ReplaySubject(1);
subject$.next(1);
subject$.next(2);
subject$.next(3);
subject$.subscribe(console.log);
// Output
// 3
There’s also a second argument that can be passed to ReplaySubject in order to define how long the old values should be stored in memory.
const subject$ = new ReplaySubject(100,*250);
setTimeout(() => subject$.next(1), 50);
setTimeout(() => subject$.next(2), 100);
setTimeout(() => subject$.next(3), 150);
setTimeout(() => subject$.next(4), 200);
setTimeout(() => subject$.next(5), 250);
setTimeout(() => {
subject$.subscribe(v => console.log('SUBCRIPTION A', v));
}, 200);
setTimeout(() => {
subject$.subscribe(v => console.log('SUBCRIPTION B', v));
}, 400);
we create a ReplaySubject whose bufferSize is 100 and windowTime 250
we emit 5 values every 50ms
we subscribe the first time after 200ms and the second time after 400ms
Let’s analyze the output:
SUBCRIPTION A 1
SUBCRIPTION A 2
SUBCRIPTION A 3
SUBCRIPTION A 4
SUBCRIPTION A 5
SUBCRIPTION B 4
SUBCRIPTION B 5
The subscription A was able to replay all the items, but the subscription B was only able to replay items 4 and 5, as they were the only ones emitted within the window time specified.
BehaviorSubject
BehaviorSubject is probably the most well-known subclass of Subject. This kind of Subject represents the “current value”.
Interestingly, the Combine framework named it CurrentValueSubject
Similarly to ReplaySubject, it will also replay the current value whenever an observer subscribes to it.
In order to use BehaviorSubject we need to provide a mandatory initial value when this gets instantiated.
const subject$ = new BehaviorSubject(0); // 0 is the initial value
subject$.next(1);
setTimeout(() => {
subject$.subscribe(console.log);
}, 200);
// 1
Whenever a new value is emitted, the BehaviorSubject will store the value in the property value which can also be publicly accessed.
Final Words
Rx Subjects are quite powerful tools, and like any powerful tool in software engineering, they can also be easily abused. The concept of unicasting and multicasting is a striking distinction that you need to take into account when working with Rx.
Understanding how Subjects work internally can be fairly beneficial to avoid common pitfalls and bugs, but also to understand when you need them and when, instead, you don’t.
If you need any clarifications, or if you think something is unclear or wrong, do please leave a comment!
I hope you enjoyed this article! If you did, follow me on* Medium, Twitter or my website for more articles about Software Development, Front End, RxJS, Typescript and more!