import { BehaviorSubject, Observable, Subscription, timer } from 'rxjs';
import { switchMap, takeUntil, tap } from 'rxjs/operators';

const logWithTime = (...args) => {
  if (!(window as any).debug) return;
  const now = new Date();
  console.log(`[${now.toISOString()}] [OSBH] `, ...args);
};

/**
 * Buffers order notifications briefly, then processes them in real-time.
 *
 * Buffers incoming order notifications until there is a period of inactivity
 * defined by `bufferTimeoutDuration` milliseconds. This buffering occurs only once
 * during the initial load to group together any burst of incoming notifications.
 * After the initial buffer is processed, the method switches to real-time processing
 * of order notifications without any further buffering.
 *
 * @param emitter$ - The BehaviorSubject to subscribe to, emitting arrays of type T.
 * @param onUpdate - A callback function that will be called with the buffered values
 *                   when the buffer timeout is reached.
 * @param bufferTimeoutDuration - The duration (in milliseconds) for which values will be buffered
 *                                before processing. Defaults to 5000 milliseconds (5 seconds).
 * @returns A Subscription object that can be used to unsubscribe from the buffered subscription.
 */
export const createBufferedSubscription = <T>(
  emitter$: BehaviorSubject<T[]>,
  onUpdate: (value: T[]) => void,
  bufferTimeoutDuration: number = 5000,
): Subscription => {
  logWithTime('Starting buffered subscription...');
  // Subscribe to the emitter and buffer values
  const subscription: Subscription = emitter$
    .pipe(
      // Buffer values until the timeout is reached
      tap((value: T[]) => logWithTime(`Received: ${value?.length}`)), // Log received values
      switchMap(() => {
        // Create a buffer to collect all received values
        const buffer$: Observable<T[]> = timer(bufferTimeoutDuration).pipe(
          tap(() => {
            logWithTime(
              `Buffering timeout reached. Processing buffered values.`,
            );
          }),
          // Collect all buffered values
          switchMap(() => {
            return emitter$.pipe(
              takeUntil(timer(bufferTimeoutDuration)), // Stop taking values after the timeout
              tap((value: T[]) =>
                logWithTime(`Buffered value: ${value?.length}`),
              ),
            );
          }),
        );

        return buffer$; // Return the buffer observable
      }),
    )
    .subscribe({
      next: (bufferedValues: T[]) => {
        // This will be called when the buffer timeout is reached
        logWithTime('Buffer processed. Unsubscribing...');
        subscription.unsubscribe(); // Unsubscribe after processing the buffer
        onUpdate(bufferedValues);
      },
      error: (err: Error) => logWithTime('Error:', err),
      complete: () => logWithTime('Completed'),
    });

  return subscription;
};
