import { cloneDeep } from "lodash-es";

export type StreamMessage = {
  conversationId: string;
  message: string;
  responseText: string;
};

export type ReadResult = {
  data?: StreamMessage;
  responseText?: string;
  done: boolean;
  error?: Error;
};

export type ReadStreamOptions = {
  stream: ReadableStream;
  signal?: AbortSignal;
  onRead: (result: ReadResult) => void;
  onStart?: (data?: StreamMessage) => void;
};

class AIStream {
  textDecoder = new TextDecoder();
  textEncoder = new TextEncoder();

  decodeText(str: Uint8Array) {
    return this.textDecoder.decode(str, { stream: true });
  }

  encodeText(data: Record<string, any>, endStr = "") {
    return this.textEncoder.encode(JSON.stringify(data) + endStr);
  }

  /**
   * 解析JSON数据
   */
  parseChunk(chunk: string) {
    const filterData = chunk.replace(/data:|\n/g, "");
    if (!filterData) return "";
    const isValid = filterData.startsWith("{") && filterData.endsWith("}");
    if (isValid) {
      try {
        const parsedData = JSON.parse(filterData);
        if (parsedData && "message" in parsedData) {
          return parsedData as StreamMessage;
        }
      } catch (e) {
        console.log(e, filterData);
        return false;
      }
    }
  }

  readStream = async (readOptions: ReadStreamOptions) => {
    const { onRead, onStart, stream, signal } = readOptions;
    if (!stream) return onRead?.({ done: true });
    let isFirstRead = true;
    const reader = stream.getReader();
    try {
      let isDone = false;
      // 响应文本数据
      let responseText = "";
      // 被截断的数据
      let lastBuffer = "";
      // 返回的的其他字段
      let restFields = {} as StreamMessage;
      while (!isDone) {
        const { done, value } = await reader.read();
        if (isFirstRead) {
          isFirstRead = false;
          const text = this.decodeText(value);
          const parseText = text.split("\n").filter(Boolean);
          const data = this.parseChunk(parseText?.[0]);
          data && onStart?.(data);
        }
        if (done || signal?.aborted) {
          isDone = done;
          await reader.cancel("read done");
          return onRead?.({ done: done });
        }
        const streamText = this.decodeText(value);

        const parsedData = streamText
          .replace(/(\n\n)/g, "\n")
          .replace(/data:/g, "")
          .split("\n")
          .filter(Boolean);
        if (parsedData.length) {
          for (const str of parsedData) {
            const parseStr = this.parseChunk(str);
            if (parseStr) {
              responseText += parseStr.message;
              restFields = parseStr;
            } else {
              if (lastBuffer) {
                const parsedNewValue = this.parseChunk(lastBuffer + str);
                if (parsedNewValue) {
                  responseText += parsedNewValue.message;
                  restFields = parsedNewValue;
                  lastBuffer = "";
                }
              } else {
                lastBuffer = str;
              }
            }
          }
          onRead?.({
            data: cloneDeep(restFields),
            done: false,
            responseText,
          });
        }
      }
    } catch (e) {
      onRead?.({ done: true, error: e as Error });
      await reader.cancel(e);
    }
  };

  transformStream() {
    let lastBuffer = "";
    return new TransformStream<Uint8Array, Uint8Array>({
      flush() {
        /* do any destructor work here */
      },
      transform: (chunk, controller) => {
        // eslint-disable-next-line no-async-promise-executor
        return new Promise(async (resolve) => {
          const text = this.decodeText(chunk);
          const filterText = text.replace(/data:/g, "").split("\n").filter(Boolean);
          const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
          const parseItem = (chunkItem: string[], restData = {}) => {
            // eslint-disable-next-line no-async-promise-executor
            return new Promise<void>(async (_resolve) => {
              for (const itemStr of chunkItem) {
                await sleep(10);
                controller.enqueue(
                  this.encodeText(
                    {
                      message: itemStr,
                      ...restData,
                    },
                    "\n\n",
                  ),
                );
              }
              _resolve();
            });
          };
          for (const str of filterText) {
            const itemStr = this.parseChunk(str);
            if (itemStr) {
              const { message, ...restData } = itemStr;
              await parseItem(message.split(""), restData);
            } else {
              if (lastBuffer) {
                const parsedNewValue = this.parseChunk(lastBuffer + str);
                if (parsedNewValue) {
                  lastBuffer = "";
                  const { message, ...restData } = parsedNewValue;
                  await parseItem(message.split(""), restData);
                }
              } else {
                lastBuffer = str;
              }
            }
          }
          resolve();
        });
      },
    });
  }
}

export default new AIStream();
