If you haven't read the first part yet: https://dev.to/titouancreach/how-i-replaced-trpc-with-effect-rpc-in-a-nextjs-app-router-application-4j8p
With Effect-RPC, it is possible to define a handler that returns a stream, which can be particularly useful when dealing with AI completion or other services that emit chunks over time.
Create Rpc router
In the first part, we defined a tagged request—an object containing a schema for success, payload, and failure.
For a streaming request, we define a request that extends Rpc.StreamRequest
.
In this example, let's create an API route that recognizes text from a base64 image. This process is known as OCR (Optical Character Recognition).
import { Rpc } from "@effect/rpc";
import { Schema as S } from "effect";
export class OcrReq extends Rpc.StreamRequest<OcrReq>()(
"OcrReq",
{
payload: {
imageB64: S.String,
},
success: S.Struct({
textChunk: S.String,
}),
failure: S.Never,
},
) {}
Now, we'll create a service to handle the domain logic. In this example, we’ll simulate a stream with a fake AI service.
class AiService extends Effect.Service<AiService>()("AiService", {
succeed: {
ocr: (_publicImageUrl: string): Stream.Stream<string, never, never> => {
return Stream.fromIterable([
"The",
"quick",
"brown",
"fox",
"jumps",
"over",
"the",
"lazy",
"dog",
]).pipe(Stream.tap(() => Effect.sleep("1 second")));
},
},
}) {}
This service, called AiService
, has an ocr
method that returns a stream. The stream will emit "The", then "quick", and so on, with a one-second delay between each chunk to simulate AI processing.
Now let’s create our RPC handler. The handler needs to:
- Upload the image to obtain a public URL
- Call our AI service
- Stream the response
- Delete the file afterward
Let’s start by creating a new RPC router:
const aiRouter = RpcRouter.make(
Rpc.Stream(OcrReq, ({ imageB64 }) =>
Effect.gen(function* () {
// imaginary service to upload things to a bucket
const uploadService = yield* UploadService;
const aiService = yield* AiService;
// will delete the file once the scope is closed
const publicUrl = Effect.acquireRelease(
uploadService.uplad(imageB64),
(name) => uploadService.delete(name),
);
const stream = aiService.ocr(publicUrl).pipe(
Stream.map((word) => ({
textChunk: word;
}),
));
}),
).pipe(
Effect.scoped, // we need to provide a scope, so that when the scope will be closed, the finalizer will run
Stream.unwrap, // unwrap the stream if it's wrapped in an effect
Stream.orDie // since Failure is never, we die instead of returning the error
),
);
Next.js handler
Now, we need to convert a Next.js request into an RPC request, and the RPC response into a Next.js response.
In the first part, we created a file route.ts
with a POST
function.
The function prototype looks like this:
export const POST = async (
request: NextRequest,
{ params }: { params: Promise<SegmentParams> }, // Next.js 15 async api
) => {
// ...
}
In this function, we’ll create a handler, which is a function that takes a request and returns a stream. In the first part, we used RpcRouter.toHandlerNoStream
. This time, we will use RpcRouter.toHandler
.
const handler = Rpc.toHandler(aiRouter);
export const POST = async (
request: NextRequest,
{ params }: { params: Promise<SegmentParams> }, // Next.js 15 async API
) => {
const req = await request.json();
const stream = handler(req);
}
Fortunately, Next.js supports streaming via the standard Response object.
To handle this, we need to encode the stream to a ReadableStream. Here's the code:
const respStream = await Stream.toReadableStreamEffect(
stream.pipe(
Stream.chunks,
Stream.map((_) => `${JSON.stringify(Chunk.toReadonlyArray(_))}\n`),
Stream.encodeText,
),
).pipe(ServerRuntime.runPromise);
Finally, we can return this ReadableStream
directly in a response, along with custom headers:
return new Response(respStream, {
headers: {
"Content-Type": "application/ndjson; charset=utf-8",
"Transfer-Encoding": "chunked",
},
status: 200,
});
Rpc resolver in the client
To call this on the client side, we need to create an RPC client using a resolver (HTTP resolver). In Effect, there are two types of HTTP resolvers:
- A no-stream resolver (for handling payload and response as JSON bodies).
- A stream resolver (for handling the payload as a JSON body and the response as
ndjson
(newline-delimited JSON)).
In the first part, we created a client using the no-stream resolver:
// ❌ From part one
import * as HttpResolver from "@effect/rpc-http/HttpRpcResolverNoStream";
import { Config, Effect } from "effect";
import * as Resolver from "@effect/rpc/RpcResolver";
const frontEndConfigHost = Config.succeed(process.env.NEXT_PUBLIC_HOST);
const makeHelloClient = ({ cookies }: { cookies: string }) => {
const helloClient = Config.string("HOST").pipe(
Config.orElse(() => frontEndConfigHost),
Effect.andThen((host) =>
HttpResolver.make<HelloRouter>(
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`${host}/api/hello`),
),
HttpClient.mapRequest(HttpClientRequest.setHeader("cookie", cookies)),
),
).pipe(Resolver.toClient),
),
);
return helloClient;
};
In this case, we will update the import to use the stream-capable HTTP resolver:
+import * as HttpResolver from "@effect/rpc-http/HttpRpcResolver";
-import * as HttpResolver from "@effect/rpc-http/HttpRpcResolverNoStream";
import { Config, Effect } from "effect";
import * as Resolver from "@effect/rpc/RpcResolver";
const frontEndConfigHost = Config.succeed(process.env.NEXT_PUBLIC_HOST);
const makeAiClient = ({ cookies }: { cookies: string }) => {
const aiClient = Config.string("HOST").pipe(
Config.orElse(() => frontEndConfigHost),
Effect.andThen((host) =>
HttpResolver.make<AiRouter>(
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`${host}/api/ai`),
),
HttpClient.mapRequest(HttpClientRequest.setHeader("cookie", cookies)),
),
).pipe(Resolver.toClient),
),
);
return aiClient;
};
In the first part, we also made the client available via React.Context
.
Consuming the stream
Once all the boilerplate is done, we can move on to what we’re really here for: consuming the stream.
Here’s how you can consume the stream:
import { AiClientContext } from "./RpcClientProvider";
import { useMutation } from "@tanstack/react-query"
export function OcrImage() {
const rpcClient = React.useContext(AiClientContext);
const queryClient = useQueryClient();
const [transcription, setTranscription] = useState("");
const transcribeMutation = useMutation({
mutationKey: ["transcribeMutation"],
mutationFn: ({ imageB64 }: { imageB64: string }) => Effect.gen(function* () {
// Remember, rpcClient is an effect, as it needs the config to be created
const client = yield* rpcClient;
// Call the api route
const stream = yield* client(new OcrReq({ imageB64 }));
// Then handle the stream as needed
yield* Stream.runForeach(stream, ({ textChunk: word }) => {
setTranscription((old) => `${old} ${word}`)
});
}).pipe(BrowserRuntime.runPromise)
// ^ My custom managed runtime for browser
})
return (
<div>
<UploadImageB64 onUpload={b64 => startTransition(async () =>
transcribeMutation.mutateAsync({ imageB64: b64 })
)} />
<Input readonly value={transcription} />
</div>
)
}