运维咖啡吧

享受技术带来的乐趣,体验生活给予的感动

Python实现HTTPS网站证书过期监控及更新

当前HTTP逐渐被大众所抛弃,HTTPS正在成为互联网上的主流。前段时间我们维护的一个HTTPS证书即将过期,由于多云环境比较复杂,团队小伙伴在替换更新证书的过程中出现疏漏,导致有一个域名证书没有及时更新,影响了系统可用性,为了杜绝这种问题再次发生,便写了这么个功能

比较简单,但很实用,再也不会出现证书漏更新的问题,具体流程为:扫描域名列表-->检查是否开启HTTPS-->获取证书过期时间-->记录入库-->更新证书

获取域名列表

我们用了很多的内部私有云SAAS服务,这些SAAS服务都提供有完善的API支持,DNS服务便是其中之一,可以根据DNS系统提供的API拿到所有的域名和记录。公有云也提供有完善的API文档,这里以阿里云为例,获取域名记录的代码如下

import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkdomain.request.v20180129.QueryDomainListRequest import QueryDomainListRequest
from aliyunsdkalidns.request.v20150109.DescribeDomainRecordsRequest import DescribeDomainRecordsRequest

class DomainApi:
    def __init__(self):
        self.client = client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')

    def get_domains(self, pagenum=1, pagesize=10):
        try:
            request = QueryDomainListRequest()
            request.set_accept_format('json')

            request.set_PageNum(pagenum)
            request.set_PageSize(pagesize)

            response = self.client.do_action_with_exception(request)
            jsondata = json.loads(str(response, encoding='utf-8'))

            return True, jsondata
        except Exception as e:
            return False, str(e)

    def get_records(self, domain):
        try:
            request = DescribeDomainRecordsRequest()
            request.set_accept_format('json')

            request.set_DomainName(domain)
            response = self.client.do_action_with_exception(request)

            jsondata = json.loads(str(response, encoding='utf-8'))
            return True, jsondata
        except Exception as e:
            return False, str(e)

if __name__ == '__main__':
    print(DomainApi().get_records('ops-coffee.cn'))

以上代码使用了阿里云提供的SDK,调用简单方便,最终返回请求状态及数据。get_domains方法可以获取到账号下的所有域名,get_records方法可以获取到域名下的所有解析记录,需要注意数据量大小,以确定是否需要分页查询

检查是否开启HTTPS

检查是否开启HTTPS也是简单粗暴,直接通过requests模块请求HTTPS地址,没有报错则表示开启了https支持,大概代码如下

session = requests.session()

try:
    session.get('https://' + domain)
except Exception as e:
    print(e)

需要注意的是,如果批量扫描域名的话需要使用session模式,否则可能会因为链接过多而报错

获取证书过期时间

之后再通过pyopenssl模块来拿到域名的HTTPS证书过期时间,代码如下

from _datetime import datetime
from urllib3.contrib import pyopenssl

def get_expire(domain):
    try:
        certificate = pyopenssl.ssl.get_server_certificate((domain, 443))
        data = pyopenssl.OpenSSL.crypto.load_certificate(pyopenssl.OpenSSL.crypto.FILETYPE_PEM, certificate)

        expire_time = datetime.strptime(data.get_notAfter().decode()[0:-1], '%Y%m%d%H%M%S')
        expire_days = (expire_time - datetime.now()).days

        return True, 200, {'expire_time': str(expire_time), 'expire_days': expire_days}
    except Exception as e:
        return False, 500, str(e)

if __name__ == '__main__':
    print(get_expire('blog.ops-coffee.cn'))

使用之前需要先安装pyopenssl模块,这里建议使用python3.6及以上版本,除了get_notAfter可以拿到证书过期时间外,还有以下方法能够获取到更多证书相关的信息:get_notAfter,get_notBefore,get_pubkey,get_serial_number,get_signature_algorithm,get_subject,get_version,gmtime_adj_notAfter,gmtime_adj_notBefore,has_expired

定时执行入库

以上步骤会定时执行监控,当发现证书过期时间小于30天时发报警,执行日志就写入了数据库方便前端页面展示

class Domain(models.Model):
    create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')

    domain = models.CharField(max_length=64, verbose_name='一级域名')
    jsondata = models.TextField(verbose_name='域名详情')

数据库就三个字段,在每次执行完一轮扫码后都会将详细信息计入上表中

jsondata = {
    'items': [{
        'record': 'blog.ops-coffee.cn',
        'rdtype': 'A',
        'dept': '天玑'
        'enable_https': 1,
        'expire_time': '2021-10-22 12:00:00',
        'expire_days': '387',
        'notes': ''
    }], 
    'total_count': 26,
    'record_a_count': 23,
    'record_a_https_count': 19}

Domain.objects.create(domain='ops-coffee.cn', jsondata=jsondata)

前端获取最新一条数据展示

Domain.objects.filter(domain=domain).order_by('-create_time').first()

更新证书

证书更新是最繁琐的事情,因为涉及到多平台不同环境,每家公司情况可能都有不同,我们因为大量使用了SAAS服务,SAAS服务又提供有API,所以更新起来比较简单,调用API即

class LBCApi:
    def __init__(self):
        self.domain = 'https://lbc.ops-coffee.cn'

        self.headers = {
            'content-type': 'application/json',
            'Auth-Token': get_auth_token()[1]
        }

    def update_cert(self, id, cert):
        try:
            data = json.dumps({"cert": cert})
            r = requests.put(self.domain + '/port/%d' % (id), data=data, headers=self.headers)

            if r.status_code != 200:
                return False, r.status_code, r.json()

            return True, 200, port['id']
        except Exception as e:
            return False, 500, 'PortId:%s,更新证书失败' % str(e)

if __name__ == '__main__':
    state, code, data = LBCApi().update_cert(37, 'cert-ops-coffee-cn')
    print(state, code, json.dumps(data))

搞定收官!KPI++