RxJS is a library for reactive programming based on Observables, perfect for modeling asynchronous actions such as file uploads. In this article we will see how to use RxJS to build a robust, extensible, and easy-to-test upload pipeline, starting from simple examples up to scenarios with progress tracking, cancellation, and error handling.
Why use RxJS for file uploads
File uploading is an apparently simple problem, but it quickly becomes complex when we add:
- progress bars and real-time feedback;
- error handling with retry attempts;
- the ability to cancel the upload;
- multiple uploads and controlled concurrency;
- integration with other UI events (clicks, state changes, etc.).
RxJS Observables are perfect for expressing these flows as a composition of operators, instead of ending up with a jungle of callbacks and listeners that are hard to maintain.
Basic setup: capturing the file selection event
Let’s imagine we have a file input and an “Upload” button:
<input type="file" id="fileInput" multiple>
<button id="uploadBtn">Carica</button>
<div id="status"></div>
With RxJS we can transform DOM events into Observables using fromEvent:
import { fromEvent } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const fileInput = document.getElementById('fileInput');
const uploadBtn = document.getElementById('uploadBtn');
const statusEl = document.getElementById('status');
// Stream of selected files
const files$ = fromEvent(fileInput, 'change').pipe(
map(event => /** @type {HTMLInputElement} */ (event.target).files),
filter(files => !!files && files.length > 0)
);
// Stream of clicks on the upload button
const uploadClick$ = fromEvent(uploadBtn, 'click');
We now have two distinct streams:
files$: emits aFileListevery time the user selects files;uploadClick$: emits a value every time the “Upload” button is pressed.
Creating an Observable for upload via XMLHttpRequest
To track upload progress we need the native XMLHttpRequest events,
in particular those from xhr.upload. Let’s encapsulate everything in a custom Observable:
import { Observable } from 'rxjs';
/**
* Performs the upload of a single file and returns an Observable
* that emits status objects: { type, progress, response, error }.
*/
function uploadFile$(url, file) {
return new Observable(subscriber => {
const xhr = new XMLHttpRequest();
const formData = new FormData();
formData.append('file', file);
// Progress events
xhr.upload.addEventListener('progress', event => {
if (event.lengthComputable) {
const percent = Math.round((event.loaded / event.total) * 100);
subscriber.next({
type: 'progress',
progress: percent
});
}
});
// Completion
xhr.addEventListener('load', () => {
if (xhr.status >= 200 && xhr.status < 300) {
subscriber.next({
type: 'success',
response: xhr.response
});
subscriber.complete();
} else {
subscriber.error({
type: 'error',
status: xhr.status,
message: xhr.statusText || 'Upload error'
});
}
});
// Network errors
xhr.addEventListener('error', () => {
subscriber.error({
type: 'error',
status: xhr.status,
message: 'Network error'
});
});
// Cancellation
xhr.addEventListener('abort', () => {
subscriber.next({ type: 'cancelled' });
subscriber.complete();
});
xhr.open('POST', url);
xhr.send(formData);
// Cleanup function called on unsubscribe
return () => {
if (xhr.readyState !== XMLHttpRequest.DONE) {
xhr.abort();
}
};
});
}
This Observable is very powerful:
- it emits progress events during the upload;
- it emits a success event at the end;
- it propagates errors via
subscriber.error; - it supports cancellation via
unsubscribe.
Connecting file selection, clicks, and upload
Now we want the upload of the selected files to start when the button is clicked. We use RxJS operators to combine the streams:
import { withLatestFrom, mergeMap, tap } from 'rxjs/operators';
const uploadUrl = '/api/upload';
// Each click takes the latest available FileList
const upload$ = uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)), // FileList -> File[]
tap(() => {
statusEl.textContent = 'Starting upload...';
}),
// Upload files sequentially (concurrency 1)
mergeMap(
files => files,
(files, file) => file
),
mergeMap(file => {
statusEl.textContent = `Uploading: ${file.name}`;
return uploadFile$(uploadUrl, file).pipe(
tap(event => {
if (event.type === 'progress') {
statusEl.textContent = `Uploading ${file.name}: ${event.progress}%`;
}
})
);
})
);
upload$.subscribe({
next: event => {
if (event.type === 'success') {
statusEl.textContent = 'Upload completed successfully';
}
},
error: err => {
statusEl.textContent = `Error: ${err.message || 'Upload failed'}`;
},
complete: () => {
statusEl.textContent = 'All files have been uploaded';
}
});
In this example the files are uploaded sequentially. We can easily change the concurrency strategy by adjusting the operators.
Multiple uploads with controlled concurrency
If there are many files, it makes sense to upload some in parallel, but not all of them, to avoid
saturating bandwidth or the server. mergeMap accepts a concurrency parameter:
import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
const MAX_CONCURRENCY = 3;
const parallelUpload$ = uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)),
mergeMap(files => from(files)),
mergeMap(file => uploadFile$(uploadUrl, file), MAX_CONCURRENCY)
);
With MAX_CONCURRENCY = 3, at most three uploads will be active at the same time. This is a
classic example of how RxJS allows you to express complex policies in just a few lines.
Error handling and retry
RxJS offers advanced operators for error handling, such as retry and
retryWhen. For example, we can retry uploading a file a few times, with increasing
delays between attempts.
import { retryWhen, scan, delay } from 'rxjs/operators';
function uploadFileWithRetry$(url, file) {
const maxRetries = 3;
const baseDelay = 1000; // ms
return uploadFile$(url, file).pipe(
retryWhen(errors$ =>
errors$.pipe(
scan((acc, error) => {
if (acc.attempts >= maxRetries) {
throw error;
}
return { attempts: acc.attempts + 1 };
}, { attempts: 0 }),
delay(acc => acc.attempts * baseDelay)
)
)
);
}
We can replace uploadFile$ with uploadFileWithRetry$ in our pipeline
to automatically get a more resilient behavior, without changing the orchestration logic.
Cancelling an ongoing upload
Since our upload Observable supports unsubscribe, cancelling is simple: just keep
the Subscription and call unsubscribe when the user presses a “Cancel” button.
<button id="cancelBtn">Annulla upload</button>
const cancelBtn = document.getElementById('cancelBtn');
let currentUploadSub = null;
const cancellableUpload$ = uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)),
mergeMap(file => uploadFile$(uploadUrl, file))
);
currentUploadSub = cancellableUpload$.subscribe({
next: event => {
// handle progress / success
},
error: err => {
statusEl.textContent = `Error: ${err.message}`;
},
complete: () => {
statusEl.textContent = 'Upload completed or cancelled';
}
});
fromEvent(cancelBtn, 'click').subscribe(() => {
if (currentUploadSub) {
currentUploadSub.unsubscribe();
statusEl.textContent = 'Upload cancelled by the user';
}
});
In a real application we could better model the lifecycle of the subscription (for example by creating a new stream for each click), but the key concept is that the Observable also encapsulates the XHR abort logic.
Combining UI state and upload
One of RxJS’s strengths is the ability to easily combine uploads, user input, and UI state. For example, we can disable the upload button when no files are selected, or when an upload is already in progress.
import { combineLatest, BehaviorSubject } from 'rxjs';
import { map, startWith } from 'rxjs/operators';
const hasFiles$ = files$.pipe(
map(files => files.length > 0),
startWith(false)
);
const isUploading$ = new BehaviorSubject(false);
combineLatest([hasFiles$, isUploading$]).pipe(
map(([hasFiles, isUploading]) => hasFiles && !isUploading)
).subscribe(canUpload => {
uploadBtn.disabled = !canUpload;
});
// Update state during upload
uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)),
mergeMap(file => {
isUploading$.next(true);
return uploadFile$(uploadUrl, file);
})
).subscribe({
next: () => {},
error: () => {
isUploading$.next(false);
},
complete: () => {
isUploading$.next(false);
}
});
In this way the UI stays consistent with the system state without having to manually manage countless flags.
Design tips
-
Encapsulate upload logic in functions like
uploadFile$, instead of scatteringXMLHttpRequestthroughout the UI code. -
Use explicit event types (e.g.
{ type: 'progress' | 'success' | 'error' }) so they can be easily combined and filtered. - Separate orchestration and rendering: the RxJS pipeline should only emit states, and a higher layer should transform them into DOM updates.
-
Handle backpressure: if the backend has strict limits, use
mergeMapconcurrency or other operators to limit the number of parallel uploads. - Test pipelines using RxJS marble tests, so you can easily verify complex behaviors (retry, cancellation, etc.).
Conclusions
Using RxJS for file uploads in JavaScript allows you to model the entire flow as a combination of Observables and operators: file selection, button clicks, progress, errors, retries, cancellation, and UI state. The result is more declarative, modular, and maintainable code.
Starting from the examples in this article you can adapt the pipeline to your needs: drag & drop support, chunked uploads, authentication, integration with frameworks like React or Angular, and much more, while always maintaining the same reactive philosophy.