Renato Almeida de Oliveira
ORenatoAlmeida

ORenatoAlmeida

Building a Discord ChatOps Bot

Renato Almeida de Oliveira's photo
Renato Almeida de Oliveira
·Jan 4, 2022·

7 min read

Subscribe to my newsletter and never miss my upcoming articles

In this blog, we're going to build a Python Bot for Discord that queries Netbox API. Our main goal gonna be explore async libraries and build a flexible and extensible Bot.

The first task is to create and register the bot in Discord, by following this tutorial. And add your fresh created bot to some Discord server you control.

Now with your bot ready, lets start with the quickstart sample, and build our features over it. In your working directory create the chatbot.py file, and paste the following code:

import discord

client = discord.Client()

@client.event
async def on_ready():
    print('We have logged in as {0.user}'.format(client))

@client.event
async def on_message(message):
    if message.author == client.user:
        return

    if message.content.startswith('$hello'):
        await message.channel.send('Hello!')

client.run('your token here')

The on_message event process all the messages the bot has access, and it is where we are going to add our code. One part for security reasons and another for process the bot commands.

Starting with the security part, create the file security.py and add the following code:

def authorize(client, message):
    authorized_roles =  set(['BotUser'])
    authorized_servers = ['your server']
    authorized_channels = ['some channel']

    author = message.author
    author_roles = set([ role.name for role in author.roles])

    if message.author == client.user:
        return False

    if str(message.guild) not in  authorized_servers:
        return False

    if str(message.channel) not in  authorized_channels:
        return False    

    if len( authorized_roles.intersection(author_roles) ) == 0:
        return False   

    return True

Going from each if clause this code does the following checks before allows the message:

  1. Check if the message author isn't the chatbot;
  2. Check if the message came from an authorized server;
  3. Check if the message came from an authorized channel
  4. Check if the author has the correct role for using the bot.

Now replace code in chatbot.py by the following, that adds the security feature:

import discord
import security

client = discord.Client()

@client.event
async def on_ready():
    print('We have logged in as {0.user}'.format(client))

@client.event
async def on_message(message):
    if not security.authorize(client, message):
        return

    if message.content.startswith('$hello'):
        await message.channel.send('Hello!')

client.run('your token here')

For the bot functions it's important to talk a little bit about asyncio, before start coding.

Different from multiprocessing where many tasks are executed at the same time, with asyncio only one task executed at time, but instead of waiting the return of io operations, yields the execution back to the event loop for the execution of another tasks.

In other words, the execution blocks are sliced in several small pieces, and for every io event, instead of waiting for the complete execution, the event loop go and execute a slice of another schuduled task. With that in mind, it's very common to chain several coroutines yielding every long task, with the await expression, by creating small functions.

Another desired characteristic of the chatbot is that every command implementation should be decoupled from each other.

For that we're going to use a decorator pattern for registering the commands and an executor for calling the commands, so lets update chatbot.py adding our executor:

import discord
import dispacher
import security

client = discord.Client()

@client.event
async def on_ready():
    print('We have logged in as {0.user}'.format(client))

@client.event
async def on_message(message):
    if not security.authorize(client, message):
        return

    content = message.content
    result = await dispacher.process(content)

    if result is not None:
        await message.reply(f'<@{message.author.id}>:\n{result}')

client.run('your token here')

Inside dispacher.py, let's define our decorator and the executor. The executor must compare the received message against several regular expressions, if matches execute some function.

And the decorator, must "glue" the several command functions to the regex list, by registering them. So inside the dispacher.py file add the following code:

import re

SUPPORTED_QUESTIONS = {}

def question_register(pattern, name="", description=""): 
    def decorator(func):
        SUPPORTED_QUESTIONS[pattern] = {
            'name': name,
            'description': description,
            'function':   func
        }
        return func
    return decorator

async def process(content):    
    for pattern in SUPPORTED_QUESTIONS:
        query_re = re.compile(pattern)
        match = query_re.match(content)
        if match:

            func = SUPPORTED_QUESTIONS[pattern]['function']
            kwargs = match.groupdict()
            output = await func(**kwargs)

            if output != '':
                return output          
    return None

Now inside dispacher.py lets write our first command a helper that returns all know tasks:

@question_register(r'[Hh]elp', name="Help", description="Display bot functions")
async def help(**kwargs):
    output = ">>> "
    for pattern in SUPPORTED_QUESTIONS:
        name = SUPPORTED_QUESTIONS[pattern]['name']
        description = SUPPORTED_QUESTIONS[pattern]['description']
        output += f"Name: {name}\n"
        output += f"Description: {description}\n"
        output += f"Pattern: {pattern}\n\n"
    return output

Finaly let's go to the fun part, interfacing with Netbox, in this post, I'm using the public netbox demo

By looking at NetBox API, you will see that there aren't any endpoint for query many objects, so the bot must query each object endpoint.

So for each endpoint we need an appropriate parser, and the API call. First inside netbox.py file lets define the asynchronous functions that gonna get Netbox data:

import aiohttp

async def get(endpoint, params = {}):
    api_token = <api_token>
    url = 'https://demo.netbox.dev/api'
    headers = {
        'accept': 'application/json',
        'Authorization': f"Token {api_token}",
    }
    URI = f"{url}{endpoint}"
    output = {}
    async with aiohttp.ClientSession() as session:
        async with session.get(URI, headers=headers, params=params, ssl=False) as resp:
            response =  await resp.json()
            output['count'] = response['count']
            output['results'] = []
            output['results'] = output['results'] + response['results']
        while response['next'] is not None:
            async with session.get(response['next'], headers=headers, params=params, ssl=False) as resp:
                response =  await resp.json()
                output['results'] = output['results'] + response['results']
        return output

For processing each Netbox model lets create another file parsers.py and create a parser for each model, returning the appropriate message.

def devices(data): 
    if data['count'] > 0:
        output = f'**Devices**:\n'
        for obj in data['results']:
            dev_type = obj['device_type']['display']
            site = obj['site']['display']
            name = obj['name']
            url = obj['url'].replace('/api/', '/')
            output += f'\nName: {name}'
            output += f'\nURL: {url}'
            output += f'\nDevice Type: {dev_type}'
            output += f'\nSite: {site}'
            output += f'\n--------\n'

        output += '\n'
    else:
        output = ''

    return output

def ip(data):
    if data['count'] > 0:
        output = f'**IPs**:\n'
        for obj in data['results']:
            address = obj['address']
            output += f'\nAddress: {address}'
            url = obj['url'].replace('/api/', '/')
            output += f'\nURL: {url}'
            if obj['dns_name'] != '':
                dns_name = obj['dns_name']
                output += f'\DNS Name: {dns_name}'
            if obj['assigned_object_id'] is not None:
                parent_key_map = {
                    'virtualization.vminterface': 'virtual_machine',
                    'dcim.interface': 'device'
                }
                assigned_obj_name = obj['assigned_object']['display']
                object_key = parent_key_map[obj['assigned_object_type']]
                device = obj['assigned_object'][object_key]['display']
                assigned_obj_url = obj['assigned_object']['url'].replace('/api/', '/')
                output += f'\nAssigned: {device} - {assigned_obj_name}'
                output += f'\nAssigned URL: {assigned_obj_url}'
            output += f'\n--------\n'

        output += '\n'
    else:
        output = ''

    return output

def aggregates(data):
    if data['count'] > 0:
        output = f'**Aggregates**:\n'
        for obj in data['results']:
            prefix = obj['prefix']
            output += f'\nPrefix: {prefix}'
            url = obj['url'].replace('/api/', '/')
            output += f'\nURL: {url}'
            if obj['description'] != '':
                description = obj['description']
                output += f'\nDescription: {description}'
            output += f'\n--------\n'
        output += '\n'
    else:
        output = ''

    return output

def prefixes(data):
    if data['count'] > 0:
        output = f'**Prefixes**:\n'
        for obj in data['results']:
            prefix = obj['prefix']
            output += f'\nPrefix: {prefix}'
            url = obj['url'].replace('/api/', '/')
            output += f'\nURL: {url}'
            if obj['description'] != '':
                description = obj['description']
                output += f'\nDescription: {description}'
            output += f'\n--------\n'
        output += '\n'
    else:
        output = ''

    return output

def sites(data):
    if data['count'] > 0:
        output = f'**Sites**:\n'
        for obj in data['results']:
            name = obj['name']
            url = obj['url'].replace('/api/', '/')
            output += f'\nName: {name}'
            output += f'\nURL: {url}'
            if obj['region'] is not None:
                if 'display' in obj['region']:
                    region = obj['region']['display']
                    output += f'\nRegion: {region}'
            output += f'\n--------\n'
        output += '\n'
    else:
        output = ''

    return output

Notice that each of our parsers functions has the name of a NetBox model, for passing as an attribute in dynamic calls.

In our last file, questions.py lets define our search question, that is divided in two parts:

The first one create a coroutine for each model and schedules the call. The second define the coroutine and dynamically calls the correct parser function.

from dispacher import question_register
import netbox
import parsers
import asyncio

@question_register(r'[Ss]earch (?P<query>.+)', name="Search Netbox models", description="Search for <query> object")
async def search_all(query):
    model_endpoint_dict = {
        'devices': '/dcim/devices/',
        'sites': '/dcim/sites/',
        'ip': '/ipam/ip-addresses/',
        'aggregates': '/ipam/aggregates/',
        'prefixes': '/ipam/prefixes/',
    }
    coros = [ get_output(model, model_endpoint_dict[model], query)
              for model in model_endpoint_dict ]

    results = await asyncio.gather(*coros)
    output = '>>> '
    for result in results:
        output += result
    return output

async def get_output(model, endpoint, query):
    try:
        data = await netbox.get(endpoint, params={'q' : query})
        func = getattr(parsers, model)        
        return func(data)
    except KeyError as e:
        print("Parser not implemented")
    except Exception as e:
        print(e)
    return ''

For the last step just import questions.py in the chatbot.py file, that way the decorators do their job to register each of the bot functions. If you want to add another feature just create another coroutine function and register it inside the questions file.

Thanks for your attention. all this blog code is available at my Github repository, if you have any questions or suggestions just leave a message.

 
Share this