Part 2 - How I replaced tRPC with effect rpc in a Next.js app router application - Streaming responses

Titouan CREACH - Oct 22 - - Dev Community

If you haven't read the first part yet: https://dev.to/titouancreach/how-i-replaced-trpc-with-effect-rpc-in-a-nextjs-app-router-application-4j8p

With Effect-RPC, it is possible to define a handler that returns a stream, which can be particularly useful when dealing with AI completion or other services that emit chunks over time.

Create Rpc router

In the first part, we defined a tagged request—an object containing a schema for success, payload, and failure.

For a streaming request, we define a request that extends Rpc.StreamRequest.

In this example, let's create an API route that recognizes text from a base64 image. This process is known as OCR (Optical Character Recognition).


import { Rpc } from "@effect/rpc";
import { Schema as S } from "effect";

export class OcrReq extends Rpc.StreamRequest<OcrReq>()(
"OcrReq",
  {
     payload: {
      imageB64: S.String,
    },

     success: S.Struct({
      textChunk: S.String,
    }),

    failure: S.Never,
  },
) {}

Enter fullscreen mode Exit fullscreen mode

Now, we'll create a service to handle the domain logic. In this example, we’ll simulate a stream with a fake AI service.

class AiService extends Effect.Service<AiService>()("AiService", {
  succeed: {
    ocr: (_publicImageUrl: string): Stream.Stream<string, never, never> => {
      return Stream.fromIterable([
        "The",
        "quick",
        "brown",
        "fox",
        "jumps",
        "over",
        "the",
        "lazy",
        "dog",
      ]).pipe(Stream.tap(() => Effect.sleep("1 second")));
    },
  },
}) {}

Enter fullscreen mode Exit fullscreen mode

This service, called AiService, has an ocr method that returns a stream. The stream will emit "The", then "quick", and so on, with a one-second delay between each chunk to simulate AI processing.

Now let’s create our RPC handler. The handler needs to:

  • Upload the image to obtain a public URL
  • Call our AI service
  • Stream the response
  • Delete the file afterward

Let’s start by creating a new RPC router:

const aiRouter = RpcRouter.make(
  Rpc.Stream(OcrReq, ({ imageB64 }) =>
    Effect.gen(function* () {
      // imaginary service to upload things to a bucket
      const uploadService = yield* UploadService;

      const aiService = yield* AiService;

      // will delete the file once the scope is closed
      const publicUrl = Effect.acquireRelease(
        uploadService.uplad(imageB64),
        (name) => uploadService.delete(name),
      );

      const stream = aiService.ocr(publicUrl).pipe(
        Stream.map((word) => ({
          textChunk: word;
        }),
      ));
    }),
  ).pipe(
    Effect.scoped, // we need to provide a scope, so that when the scope will be closed, the finalizer will run 
    Stream.unwrap, // unwrap the stream if it's wrapped in an effect
    Stream.orDie // since Failure is never, we die instead of returning the error
  ),
);
Enter fullscreen mode Exit fullscreen mode

Next.js handler

Now, we need to convert a Next.js request into an RPC request, and the RPC response into a Next.js response.

In the first part, we created a file route.ts with a POST function.

The function prototype looks like this:

export const POST = async (
  request: NextRequest,
  { params }: { params: Promise<SegmentParams> }, // Next.js 15 async api
) => {
  // ...
}
Enter fullscreen mode Exit fullscreen mode

In this function, we’ll create a handler, which is a function that takes a request and returns a stream. In the first part, we used RpcRouter.toHandlerNoStream. This time, we will use RpcRouter.toHandler.

const handler = Rpc.toHandler(aiRouter);


export const POST = async (
  request: NextRequest,
  { params }: { params: Promise<SegmentParams> }, // Next.js 15 async API
) => {
  const req = await request.json();

  const stream = handler(req);
}

Enter fullscreen mode Exit fullscreen mode

Fortunately, Next.js supports streaming via the standard Response object.

To handle this, we need to encode the stream to a ReadableStream. Here's the code:

const respStream = await Stream.toReadableStreamEffect(
  stream.pipe(
    Stream.chunks,
    Stream.map((_) => `${JSON.stringify(Chunk.toReadonlyArray(_))}\n`),
    Stream.encodeText,
  ),
).pipe(ServerRuntime.runPromise);
Enter fullscreen mode Exit fullscreen mode

Finally, we can return this ReadableStream directly in a response, along with custom headers:

return new Response(respStream, {
  headers: {
    "Content-Type": "application/ndjson; charset=utf-8",
    "Transfer-Encoding": "chunked",
  },
  status: 200,
});
Enter fullscreen mode Exit fullscreen mode

Rpc resolver in the client

To call this on the client side, we need to create an RPC client using a resolver (HTTP resolver). In Effect, there are two types of HTTP resolvers:

  • A no-stream resolver (for handling payload and response as JSON bodies).
  • A stream resolver (for handling the payload as a JSON body and the response as ndjson (newline-delimited JSON)).

In the first part, we created a client using the no-stream resolver:

// ❌ From part one

import * as HttpResolver from "@effect/rpc-http/HttpRpcResolverNoStream";
import { Config, Effect } from "effect";
import * as Resolver from "@effect/rpc/RpcResolver";

const frontEndConfigHost = Config.succeed(process.env.NEXT_PUBLIC_HOST);

const makeHelloClient = ({ cookies }: { cookies: string }) => {
  const helloClient = Config.string("HOST").pipe(
    Config.orElse(() => frontEndConfigHost),
    Effect.andThen((host) =>
      HttpResolver.make<HelloRouter>(
        HttpClient.fetchOk.pipe(
          HttpClient.mapRequest(
            HttpClientRequest.prependUrl(`${host}/api/hello`),
          ),
          HttpClient.mapRequest(HttpClientRequest.setHeader("cookie", cookies)),
        ),
      ).pipe(Resolver.toClient),
    ),
  );

  return helloClient;
};


Enter fullscreen mode Exit fullscreen mode

In this case, we will update the import to use the stream-capable HTTP resolver:

+import * as HttpResolver from "@effect/rpc-http/HttpRpcResolver";
-import * as HttpResolver from "@effect/rpc-http/HttpRpcResolverNoStream";
import { Config, Effect } from "effect";
import * as Resolver from "@effect/rpc/RpcResolver";

const frontEndConfigHost = Config.succeed(process.env.NEXT_PUBLIC_HOST);

const makeAiClient = ({ cookies }: { cookies: string }) => {
  const aiClient = Config.string("HOST").pipe(
    Config.orElse(() => frontEndConfigHost),
    Effect.andThen((host) =>
      HttpResolver.make<AiRouter>(
        HttpClient.fetchOk.pipe(
          HttpClient.mapRequest(
            HttpClientRequest.prependUrl(`${host}/api/ai`),
          ),
          HttpClient.mapRequest(HttpClientRequest.setHeader("cookie", cookies)),
        ),
      ).pipe(Resolver.toClient),
    ),
  );

  return aiClient;
};

Enter fullscreen mode Exit fullscreen mode

In the first part, we also made the client available via React.Context.

Consuming the stream

Once all the boilerplate is done, we can move on to what we’re really here for: consuming the stream.

Here’s how you can consume the stream:

import { AiClientContext } from "./RpcClientProvider";
import { useMutation } from "@tanstack/react-query"


export function OcrImage() {
  const rpcClient = React.useContext(AiClientContext);
  const queryClient = useQueryClient();


  const [transcription, setTranscription] = useState(""); 

  const transcribeMutation = useMutation({
    mutationKey: ["transcribeMutation"],

    mutationFn: ({ imageB64 }: { imageB64: string }) => Effect.gen(function* () {

      // Remember, rpcClient is an effect, as it needs the config to be created
      const client = yield* rpcClient;

      // Call the api route
      const stream = yield* client(new OcrReq({ imageB64 }));

      // Then handle the stream as needed
      yield* Stream.runForeach(stream, ({ textChunk: word }) => {
        setTranscription((old) => `${old} ${word}`)
      });

    }).pipe(BrowserRuntime.runPromise)
              // ^ My custom managed runtime for browser 
  })

  return (
    <div>
      <UploadImageB64 onUpload={b64 => startTransition(async () => 
        transcribeMutation.mutateAsync({ imageB64: b64 })
      )} />

      <Input readonly value={transcription} />
    </div>
  )
}

Enter fullscreen mode Exit fullscreen mode
. . . .
Terabox Video Player