Join Our Newsletter

Unconventional Secure and Asynchronous RESTful APIs using SSH

Some time ago, in a desperate search for asynchronicity, I came across a Python package that changed the way I look at remote interfaces: AsyncSSH.

Reading through their documentation and example code, you’ll find an interesting assortment of use cases. All of which take advantage of the authentication and encryption capabilities of SSH, while using Python’s asyncio to handle asynchronous communications.

Thinking about various applications I’ve developed over the years, many included functions that could benefit from decoupling into separate services. But at times, I would avoid it due to security implications.

I wanted to build informative dashboards that optimize maintenance tasks. But they bypassed business logic, so I wouldn’t dare expose them over the same interfaces. I even looked at using HTTPS client certs, but support from REST frameworks seemed limited.

I realized that asyncssh could provide the extra security I was looking for over a well known key-based system. And in my never-ending quest to find what makes things tick, I decided to take a stab at writing a REST-ish service over SSH.

A great way to familiarize myself with the library and the protocol, it helped me learn more about building asynchronous apps, creating a small framework called korv.

Korv is an SSH server and client that performs key authentication of new connection requests against a list of known public keys.

Instead of providing a command shell, a successful connection opens a TCP socket over which to exchange information. The socket server uses JSON formatting and supports a request mechanism similar to HTTP with verbs for GET, STORE, UPDATE and DELETE.

Both the server and client take advantage of asyncio. In other words, you’re able to await on long running processes on the server side, while still servicing new client requests.

Same thing happens on the client side where you can still provide a responsive application, while it awaits on the server.

Designing a protocol

We’re building a client-server system that communicates over a custom TCP socket, where the connection itself is different from standard from HTTP because it’s long-lived.

Instead of establishing the TCP connection, transferring data, and closing it, we’re able to keep the socket running for the duration of a client session.

However, the basic concepts of REST are still the same. We wish to get, create, update, and delete resources.

To build a system like this, you must decide how to exchange data that explains your intent to the other side. Meaning, you’ll have to wrap the data in a standard format that identifies the resource to operate on, and the command to execute.

While HTTP does this in its headers - and we could do something similar - we’re instead going to wrap the entire message in JSON.

Not only is JSON easy, but the formatting helps us with some validation steps. Yes, it uses more characters to encapsulate data than something like YAML, but it’s worth the trade-off in our case.

In general, I’m not a fan of bracers, but that’s what makes JSON more suitable for this situation. If a malfunction cuts off communications mid-transfer, you won’t receive the closing bracers. Standard deserialization can tell there was a transfer error because the data is not parseable due to the missing characters.

If you chose something like YAML, you could still parse the message because it’s not expecting closing characters, leading to unexpected problems at your application layer.

You can solve this multiple ways, like transmitting the length of the message and erroring out when it doesn’t match - this is what HTTP does. But there’s no need to add that complexity in our case.

Another point to think about ahead of time is how to communicate errors. To keep things simple, I opted for reusing the same ideas of HTTP status codes. A field in the response with a code in the 200s means success. 400s is a problem with data sent by the client. 500s are server errors.

Given the discussion so far, let’s decide on the parameters to send with each message.

  • Two primary fields required with every transmission: a unique identifier (like a timestamp) and the message body.
  • Requests also need the resource to operate on, and verb or command to execute (GET, STORE, UPDATE, DELETE).
  • Responses must return a status code and the identifier of the request it’s responding to.

Here’s an example GET request to a /hello resource:

{
    "id": 123,
    "verb": "GET",
    "resource": "/hello",
    "body": {}
}

The response would look like this:

{
    "id": 456,
    "request_id": 123,
    "code": 200,
    "body": "Hello World!"
}

Error responses should include extra info to help understand what’s going on. You can add a message field to convey a human-readable error, and a traceback if you wish to include stack traces.

Making the server

Before continuing with more details, let’s take a minute to remember that we’re working with SSH, which means you’ll need a key to identify the server and client accurately.

Making a private / public key pair is easy in Linux with:

ssh-keygen -t rsa -b 4096 -C "[email protected]"

Just like any other SSH session, keep the private key in the server, and give the public one to clients. It allows them to verify they’re speaking to the correct server.

For test purposes, you can get away with just one key pair, but in the real world, you’ll want different server and client key pairs.

To receive incoming connection requests, asyncssh provides us with the SSHServer class. Our server subclasses it, implementing methods that handle new connections and add callbacks.

These callbacks behave like routes in a Flask application, which execute code based on the HTTP verb and the URL (the resource field for us) requested.

Sometimes I like to work backwards from how I’d like to use the library when making a new application server.

In our case, you’ll want to configure the server with network and key information when instantiating it. Then add the resource endpoints and start listening for new connections. Here’s the code:

from korv import KorvServer


# Make a hello world function
def hello():
    return 200, "HELLO!"

# Initialize a server that uses port 8022 by default and configure the SSH keys to use
korv = KorvServer(host_keys=['your_ssh_private_key'], authorized_client_keys='client_public_keys')

# Add some callbacks
korv.add_callback('GET', '/hello', hello)

# Start listening for connections
korv.start()

The KorvServer class implementation needs code to start the asyncssh server and tell it how to handle TCP socket connections.

import asyncssh
import asyncio
import logging


class KorvServer(asyncssh.SSHServer):
    VERBS = ('GET', 'STORE', 'UPDATE', 'DELETE')

    _callbacks = {verb: dict() for verb in VERBS}

    def __init__(self, port=8022, host_keys=['ssh_host_key'], authorized_client_keys='authorized_keys'):
        """Instatiate an SSH server that listens on the given port for clients that match the authorized keys"""

        self.port = port
        self._host_keys = host_keys
        self._authorized_client_keys = authorized_client_keys

    def connection_requested(self, dest_host, dest_port, orig_host, orig_port):
        """Open a new TCP session that handles an SSH client connection"""

        logging.info(f"Connection requested {dest_host} {dest_port} {orig_host} {orig_port}")
        return _KorvServerSession(KorvServer._callbacks)

    async def __create_server(self):
        """Creates an asynchronous SSH server"""

        await asyncssh.create_server(
            KorvServer, '', self.port,
            server_host_keys=self._host_keys,
            authorized_client_keys=self._authorized_client_keys
        )

    def add_callback(self, verb, resource, callback):
        """Configure a callable to execute when receiving a request with the given verb and resource combination"""

        if verb not in KorvServer.VERBS:
            raise ValueError(f"Verb must be one of {KorvServer.VERBS}")

        if resource not in KorvServer._callbacks[verb]:
            KorvServer._callbacks[verb][resource] = list()

        KorvServer._callbacks[verb][resource].append(callback)

    def start(self):
        """Start the server"""

        logging.info(f"Listening on port {self.port}")

        loop = asyncio.get_event_loop()

        try:
            loop.run_until_complete(self.__create_server())

        except (OSError, asyncssh.Error) as exc:
            sys.exit(f'Error starting server: {exc}')

        loop.run_forever()

The asyncssh.create_server() call in __create_server() sets the correct key parameters and server implementation. But remember, this is a two-step process, where the first step establishes an authenticated SSH session, and the second opens a TCP socket over that session.

Whenever a client requests a new TCP connection after establishing the session, the server invokes connection_requested(). Think of this as a factory method that returns an implementation of asyncssh.SSHTCPSession that handles the different parts of a data transfer (more info below).

Note that I chose to name __create_server() with a double underscore to signify that this is a private function. Users of our library should not directly call it. Instead, they’ll invoke the start() method. The main reason is that it’s an asynchronous function, and I didn’t want to leave the event loop management to the user.

The start() method uses asyncio to get the active event loop, run the function that creates the server, and then listen until interrupted with run_forever() - a standard async workflow in Python.

Now let’s look at implementing the session management part of our server communications. Note that the name starts with an underscore to mark it as a protected class to use with caution.

import gzip
import asyncssh
import logging
import traceback


class _KorvServerSession(asyncssh.SSHTCPSession):
    def __init__(self, callbacks):
        self._callbacks = callbacks

    def connection_made(self, chan):
        """New connection established"""

        logging.debug("Connection incoming")
        self._chan = chan

    def connection_lost(self, exc):
        """Lost the connection to the client"""

        logging.debug(f"Connection lost\n{exc}")

    def session_started(self):
        """New session established succesfully"""

        logging.debug("Connection successful")

    def data_received(self, data, datatype):
        """New data coming in"""

        logging.debug(f"Received data: {data}")
        self._dispatch(data)

    def eof_received(self):
        """Got an EOF, close the channel"""

        logging.debug("EOF")
        self._chan.exit(0)

    def _dispatch(self, data):
        try:
            request = json.loads(gzip.decompress(data).decode('utf-8'))

            if 'id' not in request:
                logging.info("Malformed request: missing 'id'")
                self._send_response(0, 400, {"message": "Missing 'id'"})

            if 'verb' not in request:
                logging.info("Malformed request: missing 'request'")
                self._send_response(request['id'], 400, {"message": "Missing 'verb'"})

            if 'resource' not in request:
                logging.info("Malformed request: missing 'resource'")
                self._send_response(request['id'], 400, {"message": "Missing 'resource'"})

            if request['verb'] == 'STORE' and 'body' not in request['request']:
                logging.info("Malformed request: missing 'resource'")
                self._send_response(request['id'], 400, {"message": "Missing 'body'"})

            elif request['verb'] == 'UPDATE' and 'body' not in request['request']:
                logging.info("Malformed request: missing 'resource'")
                self._send_response(request['id'], 400, {"message": "Missing 'body'"})

        except Exception:
            logging.info("Unable to process request")
            self._send_response(0, 400, {"message": "Unable to process request"})

        if request['verb'] not in self._callbacks:
            logging.debug(f"No callback found for {request['verb']}")
            self._send_response(request['id'], 404)
            return

        if request['resource'] not in self._callbacks[request['verb']]:
            logging.debug(f"No callback found for {request['verb']} on {request['resource']}")
            self._send_response(request['id'], 404)
            return

        for callback in self._callbacks[request['verb']][request['resource']]:
            try:
                self._send_response(request['id'], *callback(request))

            except Exception as e:
                logging.exception(f"Internal error when executing {request['verb']} on {request['resource']}")
                self._send_response(request['id'], 500, {"message": str(e), "traceback": traceback.format_exc()})

    def _send_response(self, request_id, code, body=None):
        """Send a response to the given client request"""

        cmd = {
            'id': time.time(),
            'request_id': request_id,
            'code': code,
            'body': body
        }

        logging.info(f"{code} response to {request_id}")
        self._chan.write(gzip.compress(json.dumps(cmd, separators=[',', ':']).encode('utf-8')))

The most important pieces here are the _dispatch() function called when receiving data and the _send_response() helper function. However, I chose to add some logging in most other customizable interfaces to help explain what’s going on when doing debug.

Communication goes over channels opened when establishing the session. To send a message, simply write to those channels.

Since messages are primarily JSON text, I also chose to compress data to minimize the time spent in the actual transfer. It works quite well and with little overhead using the built-in gzip module.

The dispatch() function performs most of our checks.

It validates that the message is formatted correctly, not missing required fields and that the verb and resource you’re trying to access has implemented a callback.

Just like in regular HTTP REST interfaces, we return 400 errors whenever the message is improperly formatted, and 404s for missing callbacks or resources. If we run into an exception while attempting to execute the callback, we respond with a traceback and a 500 status.

Every callback must return a two-element tuple with the status code as the first element and the JSON body as the second.

Making a client

The client concepts are mostly the same. You’ll need to write an SSHTCPSession that implements similar methods for handling the data. But I’m also implementing an asyncssh.SSHClient subclass, which at the moment provides more debug, but I may use it in the future to better manage broken connections.

The TCP client session-management code looks like this:

import asyncssh
import gzip
import logging


class _SSHClient(asyncssh.SSHClient):
    def connection_made(self, conn):
        logging.debug(f"Connection made to {conn.get_extra_info('peername')[0]}")

    def auth_completed(self):
        logging.debug('Authentication successful')


class _SSHClientSession(asyncssh.SSHTCPSession):

    def connection_made(self, chan):
        logging.debug("Session opened")
        self._chan = chan
        self._requests = dict()

    def connection_lost(self, exc):
        logging.debug("Connection lost")
        logging.debug(f"{exc}")

    def session_started(self):
        logging.debug("Session successful")

    def data_received(self, data, datatype):
        logging.debug(f"Received data: {data}")

        try:
            data = json.loads(gzip.decompress(data).decode('utf-8'))

            if data['request_id'] in self._requests:
                if callable(self._requests[data['request_id']]):
                    self._requests[data['request_id']](data)

                if self._requests[data['request_id']] is None:
                    self._requests[data['request_id']] = data
                else:
                    del(self._requests[data['request_id']])

        except Exception:
            logging.exception(f"There was an error processing the server response")

    def eof_received(self):
        logging.debug("Received EOF")
        self._chan.exit(0)

    async def send_request(self, verb, resource, body, callback):
        if verb not in ['GET', 'STORE', 'UPDATE', 'DELETE']:
            raise ValueError("Unknown verb")

        request = {
            'id': time.time(),
            'verb': verb,
            'resource': resource,
            'body': body
        }

        self._requests[request['id']] = callback
        self._chan.write(gzip.compress(json.dumps(request, separators=[',', ':']).encode('utf-8')))
        logging.debug(f"{verb} {resource} {body}")

        return request['id']

In this case, we have extra complexity in keeping track of requests sent to the server. It helps match responses with calling methods and provides both a synchronous and asynchronous interface to our users.

As you can see in the send_request() function, we store the id of messages being sent in a dictionary along with a function to call whenever a response comes in. If the callback function is None, then the request is synchronous, so we only need to store the data.

Let’s look at the client class.

import time
import logging
import asyncio

from threading import Thread


class KorvClient:

    def __init__(self, host='localhost', port=8022, client_keys=None, known_hosts=None, max_packet_size=32768):
        self.max_packet_size = max_packet_size

        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)
        self._session = asyncio.get_event_loop().run_until_complete(self.__connect(host, port, known_hosts, client_keys))

        try:
            t = Thread(target=self.__start_loop, args=(self._loop,))
            t.start()

        except (OSError, asyncssh.Error) as exc:
            sys.exit(f'SSH connection failed: {exc}')

    def __start_loop(self, loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()

    async def __connect(self, host, port, known_hosts, client_keys):
        logging.info(f"Connecting to SSH Server {host}:{port}")
        conn, client = await asyncssh.create_connection(
            _SSHClient,
            host,
            port,
            client_keys=client_keys,
            known_hosts=known_hosts
        )

        logging.debug("Opening Socket")
        chan, session = await conn.create_connection(_SSHClientSession, host, port, max_pktsize=self.max_packet_size)
        return session

    def get(self, resource, body=None, callback=None):
        if callback is None:
            request_id = asyncio.run_coroutine_threadsafe(self._session.send_request("GET", resource, body, None), self._loop).result()

            while self._session._requests[request_id] is None:
                time.sleep(0.1)

            response = self._session._requests[request_id]
            del(self._session._requests[request_id])

            return response
        else:
            asyncio.run_coroutine_threadsafe(self._session.send_request("GET", resource, body, callback), self._loop)

    def store(self, resource, body, callback=None):
        asyncio.run_coroutine_threadsafe(self._session.send_request("STORE", resource, body, callback), self._loop)

    def update(self, resource, body, callback=None):
        asyncio.run_coroutine_threadsafe(self._session.send_request("UPDATE", resource, body, callback), self._loop)

    def delete(self, resource, body=None, callback=None):
        asyncio.run_coroutine_threadsafe(self._session.send_request("DELETE", resource, body, callback), self._loop)

Wait! What are threads doing here?

To make the interface fully asynchronous, you’ll need access to a thread that runs a separate asyncio event loop as a work queue.

Experience has taught me that it’s better to isolate your async work in a separate event loop, which requires a different thread.

You don’t know what other python packages or asyncio mechanisms your users are implementing, and they can interfere with yours when using the same event loop.

For more details about asynchronous magic, check out this article about asyncio and threading.

Instantiating a KorvClient session automatically attempts to connect to the server and open the TCP socket.

Looking through the __connect() function, you’ll find two connection attempts. As mentioned earlier, the first is to establish the SSH session with the server in which authentication takes place through the key exchange. While the second one is opening the TCP socket with the custom SSHTCPSession implementation discussed earlier.

Once a session is up, the client reuses the send_request() function. Wrapping it to present users with methods to execute each verb: get(), store(), update() and delete().

The verb methods take a resource and body parameter, but also a callback function. They all use asyncio.run_coroutine_threadsafe() to ask our worker thread to write a new request into its channel. It’s the required mechanism for adding tasks to an event loop in a different thread.

While all methods are using an asynchronous callback implementation, get() also implements a synchronous wait mechanism that blocks with a sleep until it receives a response. It checks the request tracking dictionary for a response and deletes the request once received.

As you can tell, asyncssh is quite versatile. I’ve successfully used it to replace paramiko for all my SSH, SFTP, and SCP needs. I encourage you to try it out. It’s very stable, easy to work with, and asynchronous!

Summarizing

We’ve shown that REST APIs don’t have to be limited to HTTP. You can leverage key concepts of RESTful interfaces and successfully implement them in other systems. While SSH is not the standard go-to answer for this type of API, Python packages like asyncssh enable us to integrate with it easily.

Feel free to pip install korv or visit the github repo to play around with the code we looked at today. You’ll find more details about the library and some code samples in the README. Maybe you can use it to build your next dashboard. Shoot us a tweet @tryexceptpass if you build something neat with it.