import type { AsyncThunk, AsyncThunkOptions } from "@reduxjs/toolkit";

import { streamThunkHandlerBatched, type TrimmedThunkAPI } from "@/features/thunk";
import { createLocalTag } from "@/store/locations";
import { streamTypePrefix } from "@/store/proxy";
import { createAppAsyncThunk, selectCurrentUserId, selectStoreConfig } from "@/store/redux";
import type { AppAsyncThunkConfig, RootState } from "@/store/types";

type StreamParserArgs<ThunkArg, StreamContent> = {
    /** A function to create the gRPC stream object.
     *
     * Arguments are provided in a map to cut down on boilerplate, as
     * different streams require completely different combinations of them.
     */
    rpc: (args: {
        arg: ThunkArg;
        state: RootState;
        signal: AbortSignal;
    }) => AsyncIterableIterator<StreamContent> | undefined;
    /** A function to interpret data received from the stream.
     *
     * Arguments are provided in a map to cut down on boilerplate, as
     * different streams require completely different combinations of them.
     */
    parser: (args: { arg: ThunkArg; thunkAPI: TrimmedThunkAPI; content: StreamContent[]; }) => void;
    /** An optional override for the log prefix for the stream.
     *
     * Possible values are:
     * - `undefined` => use the default, the thunk's `typePrefix`
     * - `string` => override with a static string
     * - `(args: { arg: ThunkArg; state: RootState }) => string` => use a string
     * derived from the argument to the thunk and the current state
     */
    logId?: string | ((args: { arg: ThunkArg; state: RootState; }) => string);
    /** Number of milliseconds to debounce messages coming out of the stream.
     *
     * Optional, defaults to 50ms.
     */
    batchMs?: number;
};

/** Define a thunk for a bidir streaming action.
 *
 * This requires that the store be managing the connection.
 *
 * @param typePrefix a stringy identifier for the thunk
 * @param parserArgs a map of the required data for the stream - see
 * `StreamParserArgs`
 * @param options options for the thunk (see `AsyncThunkOptions`)
 * @return a function to use to start the thunk
 */
export const createStreamingAsyncThunk = <
    ThunkArg,
    StreamContent = unknown,
    ThunkApiConfig extends AsyncThunkOptions<ThunkArg, AppAsyncThunkConfig> = AsyncThunkOptions<
        ThunkArg,
        AppAsyncThunkConfig
    >,
>(
    typePrefix: string,
    { parser, batchMs, rpc, logId }: StreamParserArgs<ThunkArg, StreamContent>,
    options?: ThunkApiConfig,
): AsyncThunk<void, ThunkArg, AppAsyncThunkConfig> => {
    // Add the unique prefix to allow us to filter out the `fulfilled`/
    // `rejected` actions that would be dispatched when a stream ends.
    const prefix = streamTypePrefix + typePrefix;
    return createAppAsyncThunk<void, ThunkArg>(
        prefix,
        async (arg: ThunkArg, thunkAPI) => {
            const state = thunkAPI.getState();

            const req = rpc({ arg, state, signal: thunkAPI.signal });
            if (!req) return;

            const logPrefix = typeof logId === "function" ? logId({ arg, state }) : logId;

            const curried = (thunkAPI: TrimmedThunkAPI, content: StreamContent[]) =>
                parser({ arg, thunkAPI, content });

            return await streamThunkHandlerBatched(
                thunkAPI,
                req,
                batchMs ?? 50,
                curried,
                logPrefix ?? typePrefix,
            );
        },
        {
            ...options,
            condition(arg, api) {
                const state = api.getState();

                const { manageConnection } = selectStoreConfig(state);
                const userId = selectCurrentUserId(state);

                if (!manageConnection || !userId) return false;

                return options?.condition?.(arg, api) || true;
            },
            // Pending thunk actions must *never* be proxied or broadcast.
            getPendingMeta: (base, api) => {
                const tag = createLocalTag(base.requestId);
                if (!options?.getPendingMeta) return tag;

                return {
                    ...options.getPendingMeta(base, api) ?? {},
                    ...tag,
                };
            },
        },
    );
};
