import { clone, cloneDeep } from 'lodash-es'; import { Observable, Subscription } from 'rxjs'; import { customRef, onBeforeUnmount, readonly, Ref } from 'vue'; type CloneMode = 'noclone' | 'shallow' | 'deep'; /** * Returns a readonly (no writes) ref for an RxJS Observable * @param stream$ The RxJS Observable to listen to * @param initialValue The initial value to apply until the stream emits a value * @param cloneMode Determines whether or not and how deep to clone the emitted value. * Useful for issues in reactivity due to reference sharing. Defaults to shallow clone * @returns A readonly ref which has the latest value from the stream */ export function useReadonlyStream( stream$: Observable, initialValue: T, cloneMode: CloneMode = 'shallow' ): Ref { let sub: Subscription | null = null; onBeforeUnmount(() => { if (sub) { sub.unsubscribe(); } }); const r = customRef((track, trigger) => { let val = initialValue; sub = stream$.subscribe((value) => { if (cloneMode === 'noclone') { val = value; } else if (cloneMode === 'shallow') { val = clone(value); } else if (cloneMode === 'deep') { val = cloneDeep(value); } trigger(); }); return { get() { track(); return val; }, set() { trigger(); // <- Not exactly needed here throw new Error('Cannot write to a ref from useReadonlyStream'); }, }; }); // Casting to still maintain the proper type signature for ease of use return readonly(r) as Ref; } export function useStream( stream$: Observable, initialValue: T, setter: (val: T) => void ) { let sub: Subscription | null = null; onBeforeUnmount(() => { if (sub) { sub.unsubscribe(); } }); return customRef((track, trigger) => { let value = initialValue; sub = stream$.subscribe((val) => { value = val; trigger(); }); return { get() { track(); return value; }, set(value: T) { trigger(); setter(value); }, }; }); } /** A static (doesn't cleanup on itself and does * not require component instace) version of useStream */ export function useStreamStatic( stream$: Observable, initialValue: T, setter: (val: T) => void ): [Ref, () => void] { let sub: Subscription | null = null; const stopper = () => { if (sub) { sub.unsubscribe(); } }; return [ customRef((track, trigger) => { let value = initialValue; sub = stream$.subscribe((val) => { value = val; trigger(); }); return { get() { track(); return value; }, set(value: T) { trigger(); setter(value); }, }; }), stopper, ]; } export type StreamSubscriberFunc = ( stream: Observable, next?: ((value: T) => void) | undefined, error?: ((e: any) => void) | undefined, complete?: (() => void) | undefined ) => void; /** * A composable that provides the ability to run streams * and subscribe to them and respect the component lifecycle. */ export function useStreamSubscriber(): { subscribeToStream: StreamSubscriberFunc; } { const subs: Subscription[] = []; const runAndSubscribe = ( stream: Observable, next?: (value: T) => void, error?: (e: any) => void, complete?: () => void ) => { const sub = stream.subscribe({ next, error, complete: () => { if (complete) complete(); subs.splice(subs.indexOf(sub), 1); }, }); subs.push(sub); }; onBeforeUnmount(() => { subs.forEach((sub) => sub.unsubscribe()); }); return { subscribeToStream: runAndSubscribe, }; }