Skip to content

Examples

aiohttp

from aiohttp import web

from jsonrpcserver import Result, Success, async_dispatch, method


@method
async def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


async def handle(request: web.Request) -> web.Response:
    """Handle aiohttp request"""
    return web.Response(
        text=await async_dispatch(await request.text()), content_type="application/json"
    )


app = web.Application()
app.router.add_post("/", handle)

if __name__ == "__main__":
    web.run_app(app, port=5000)

See blog post.

Django

Create a views.py:

from django.http import HttpRequest, HttpResponse  # type: ignore
from django.views.decorators.csrf import csrf_exempt  # type: ignore

from jsonrpcserver import Result, Success, dispatch, method


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


@csrf_exempt  # type: ignore
def jsonrpc(request: HttpRequest) -> HttpResponse:
    """Handle Django request"""
    return HttpResponse(
        dispatch(request.body.decode()), content_type="application/json"
    )

See blog post.

FastAPI

import uvicorn
from fastapi import FastAPI, Request, Response

from jsonrpcserver import Result, Success, dispatch, method

app = FastAPI()


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


@app.post("/")
async def index(request: Request) -> Response:
    """Handle FastAPI request"""
    return Response(dispatch(await request.body()))


if __name__ == "__main__":
    uvicorn.run(app, port=5000)

See blog post.

Flask

from flask import Flask, Response, request

from jsonrpcserver import Result, Success, dispatch, method

app = Flask(__name__)


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


@app.route("/", methods=["POST"])
def index() -> Response:
    """Handle Flask request"""
    return Response(
        dispatch(request.get_data().decode()), content_type="application/json"
    )


if __name__ == "__main__":
    app.run()

See blog post.

http.server

Using Python's built-in http.server module.

from http.server import BaseHTTPRequestHandler, HTTPServer

from jsonrpcserver import Result, Success, dispatch, method


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


class TestHttpServer(BaseHTTPRequestHandler):
    """HTTPServer request handler"""

    def do_POST(self) -> None:  # pylint: disable=invalid-name
        """POST handler"""
        # Process request
        request = self.rfile.read(int(self.headers["Content-Length"])).decode()
        response = dispatch(request)
        # Return response
        self.send_response(200)
        self.send_header("Content-type", "application/json")
        self.end_headers()
        self.wfile.write(response.encode())


if __name__ == "__main__":
    HTTPServer(("localhost", 5000), TestHttpServer).serve_forever()

See blog post.

jsonrpcserver

Using jsonrpcserver's built-in serve method.

from jsonrpcserver import Result, Success, method, serve


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


if __name__ == "__main__":
    serve()

Sanic

from sanic import Sanic
from sanic.request import Request
from sanic.response import HTTPResponse, json

from jsonrpcserver import Result, Success, dispatch_to_serializable, method

app = Sanic("JSON-RPC app")


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


@app.route("/", methods=["POST"])
async def test(request: Request) -> HTTPResponse:
    """Handle Sanic request"""
    return json(dispatch_to_serializable(request.body))


if __name__ == "__main__":
    app.run(port=5000)

See blog post.

Socket.IO

from flask import Flask, Request
from flask_socketio import SocketIO, send  # type: ignore

from jsonrpcserver import Result, Success, dispatch, method

app = Flask(__name__)
socketio = SocketIO(app)


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


@socketio.on("message")  # type: ignore
def handle_message(request: Request) -> None:
    """Handle SocketIO request"""
    if response := dispatch(request):
        send(response, json=True)


if __name__ == "__main__":
    socketio.run(app, port=5000)

See blog post.

Tornado

from typing import Awaitable, Optional

from tornado import ioloop, web

from jsonrpcserver import Result, Success, async_dispatch, method


@method
async def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


class MainHandler(web.RequestHandler):
    """Handle Tornado request"""

    async def post(self) -> None:
        """Post"""
        request = self.request.body.decode()
        response = await async_dispatch(request)
        if response:
            self.write(response)

    def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
        pass


app = web.Application([(r"/", MainHandler)])

if __name__ == "__main__":
    app.listen(5000)
    ioloop.IOLoop.current().start()

See blog post.

Websockets

import asyncio

from websockets.server import WebSocketServerProtocol, serve

from jsonrpcserver import Result, Success, async_dispatch, method


@method
async def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


async def main(websocket: WebSocketServerProtocol, _: str) -> None:
    """Handle Websocket message"""
    if response := await async_dispatch(await websocket.recv()):
        await websocket.send(response)


start_server = serve(main, "localhost", 5000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

See blog post.

Werkzeug

from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response

from jsonrpcserver import Result, Success, dispatch, method


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


@Request.application
def application(request: Request) -> Response:
    """Handle Werkzeug request"""
    return Response(dispatch(request.data.decode()), 200, mimetype="application/json")


if __name__ == "__main__":
    run_simple("localhost", 5000, application)

See blog post.

ZeroMQ

import zmq

from jsonrpcserver import Result, Success, dispatch, method

socket = zmq.Context().socket(zmq.REP)


@method
def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


if __name__ == "__main__":
    socket.bind("tcp://*:5000")
    while True:
        request = socket.recv().decode()
        socket.send_string(dispatch(request))

See blog post.

ZeroMQ (async)

import asyncio

import aiozmq  # type: ignore
import zmq

from jsonrpcserver import Result, Success, async_dispatch, method


@method
async def ping() -> Result:
    """JSON-RPC method"""
    return Success("pong")


async def main() -> None:
    """Handle AioZMQ request"""
    rep = await aiozmq.create_zmq_stream(zmq.REP, bind="tcp://*:5000")
    while True:
        request = (await rep.read())[0].decode()
        if response := (await async_dispatch(request)).encode():
            rep.write((response,))


if __name__ == "__main__":
    asyncio.set_event_loop_policy(aiozmq.ZmqEventLoopPolicy())
    asyncio.get_event_loop().run_until_complete(main())

See blog post.