造了一个firewalld 管理器的轮子

作者: print("") 分类: python 发布时间: 2018-10-26 17:46

总是为Centos 7 的那个Firewalld 的命令太长而去百度。说实话。太不容易了。记不住啊

然后根据公司的轮子重新改造了一下。还有一些一些没有改完的。 例如,指定网卡操作。还有一些端口映射的。

那个感觉比较复杂。就没有重新去改。

暂时只支持几个选项

1. list 查看所有
2. addtcp port  添加端口
3. deltcp port  删除端口
4. addudp port 添加udp端口
5. deludp port 删除udp端口
6. addip address 添加禁止IP
7. delip address 删除禁止IP
8. addtcpport port,address,pool,type  添加IP 端口的允许/禁止
9. delportip port,address,pool,type  添加IP 端口的允许/禁止

[root@localhost soft_ico]# python aa.py  list
{'drop_ip': [{'type': 'drop', 'address': '110.110.110.110'}], 'accept_ip': [{'protocol': 'tcp', 'port': '20'}, {'protocol': 'tcp', 'port': '21'}, {'protocol': 'tcp', 'port': '22'}, {'protocol': 'tcp', 'port': '80'}, {'protocol': 'tcp', 'port': '8888'}, {'protocol': 'tcp', 'port': '30000-40000'}, {'protocol': 'udp', 'port': '30000-40000'}, {'protocol': 'tcp', 'port': '888'}, {'protocol': 'tcp', 'port': '39000-40000'}, {'protocol': 'tcp', 'port': '8181'}, {'protocol': 'tcp', 'port': '12'}, {'protocol': 'udp', 'port': '12'}, {'protocol': 'tcp', 'port': '123'}, {'protocol': 'tcp', 'port': '138'}, {'protocol': 'udp', 'port': '138'}, {'protocol': 'tcp', 'port': '480-580'}], 'accept': [], 'reject': [{'protocol': 'udp', 'type': 'reject', 'port': '18', 'address': '192.168.10.0/24'}, {'protocol': 'udp', 'type': 'reject', 'port': '19', 'address': '192.168.10.2'}]}

drop_ip 代表的指的是禁止的某个IP访问本服务器

accept_ip 代表着是运行那些端口放行。protocol 表示协议 tcp udp 的方式

accept  代表某个端口运行那些IP访问。例如 本机的80端口 运行 192.168.1.10 的Ip访问

reject  和上面相反。这个是禁止的意思。

下面是样例

开放 tcp 89 端口

[root@localhost soft_ico]# python aa.py addtcp 89 
success
查看一下

[root@localhost soft_ico]# python aa.py list |grep 89
{'drop_ip': [{'type': 'drop', 'address': '110.110.110.110'}], 'accept_ip': [{'protocol': 'tcp', 'port': '20'}, {'protocol': 'tcp', 'port': '21'}, {'protocol': 'tcp', 'port': '22'}, {'protocol': 'tcp', 'port': '80'}, {'protocol': 'tcp', 'port': '8888'}, {'protocol': 'tcp', 'port': '30000-40000'}, {'protocol': 'udp', 'port': '30000-40000'}, {'protocol': 'tcp', 'port': '888'}, {'protocol': 'tcp', 'port': '39000-40000'}, {'protocol': 'tcp', 'port': '8181'}, {'protocol': 'tcp', 'port': '12'}, {'protocol': 'udp', 'port': '12'}, {'protocol': 'tcp', 'port': '123'}, {'protocol': 'tcp', 'port': '138'}, {'protocol': 'udp', 'port': '138'}, {'protocol': 'tcp', 'port': '480-580'}, {'protocol': 'tcp', 'port': '89'}], 'accept': [], 'reject': [{'protocol': 'udp', 'type': 'reject', 'port': '18', 'address': '192.168.10.0/24'}, {'protocol': 'udp', 'type': 'reject', 'port': '19', 'address': '192.168.10.2'}]}
[root@localhost soft_ico]# 

然后 开放udp 89 

[root@localhost soft_ico]# python aa.py addudp 89
success
[root@localhost soft_ico]# 

删除 udp 89 

[root@localhost soft_ico]# python aa.py deludp 89
success
[root@localhost soft_ico]# 

添加禁止的IP

[root@localhost soft_ico]# python aa.py addip 192.168.100.100
success
[root@localhost soft_ico]# 

禁止192.168.10.1 访问80 

[root@localhost soft_ico]# python aa.py addtcpport 80 192.168.10.1 tcp reject
success
[root@localhost soft_ico]# 

大概就是这么多了

代码如下:

#!/usr/bin/env python
# coding:utf-8

from xml.etree.ElementTree import ElementTree, Element
import os

class firewalld:
    __TREE = None
    __ROOT = None
    __CONF_FILE = '/etc/firewalld/zones/public.xml'

    # 初始化配置文件XML对象
    def __init__(self):
        if self.__TREE: return
        self.__TREE = ElementTree()
        self.__TREE.parse(self.__CONF_FILE)
        self.__ROOT = self.__TREE.getroot()

    def ExecShell(cmdstring, cwd=None, timeout=None, shell=True):
        import shlex
        import datetime
        import subprocess
        import time

        if shell:
            cmdstring_list = cmdstring
        else:
            cmdstring_list = shlex.split(cmdstring)
        if timeout:
            end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)

        sub = subprocess.Popen(cmdstring_list, cwd=cwd, stdin=subprocess.PIPE, shell=shell, bufsize=4096,
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        while sub.poll() is None:
            time.sleep(0.1)
            if timeout:
                if end_time <= datetime.datetime.now():
                    raise Exception("Timeout:%s" % cmdstring)

        return sub.communicate()

    # 获取端口列表
    def GetAcceptPortList(self):
        mlist = self.__ROOT.getchildren()
        data = []
        for p in mlist:
            if p.tag != 'port': continue
            tmp = p.attrib
            port = p.attrib['port']
            data.append(tmp)
        return data

    # 添加端口放行
    def AddAcceptPort(self, port, pool='tcp'):
        # 检查是否存在
        if self.CheckPortAccept(pool, port): return True
        attr = {"protocol": pool, "port": port}
        Port = Element("port", attr)
        self.__ROOT.append(Port)
        self.Save()
        return True

    # 删除端口放行
    def DelAcceptPort(self, port, pool='tcp'):
        # 检查是否存在
        if not self.CheckPortAccept(pool, port): return True
        mlist = self.__ROOT.getchildren()
        m = False
        for p in mlist:
            if p.tag != 'port': continue
            if p.attrib['port'] == port:
                self.__ROOT.remove(p)
                m = True
        if m:
            self.Save()
            return True
        return False

    # 添加UDP端口放行
    def AddUpdPort(self, port, pool='udp'):
        # 检查是否存在
        if self.CheckPortAccept(pool, port): return True
        attr = {"protocol": pool, "port": port}
        Port = Element("port", attr)
        self.__ROOT.append(Port)
        self.Save()
        return True

    # 删除UDP端口放行
    def DelUdpPort(self, port, pool='udp'):
        # 检查是否存在
        if not self.CheckPortAccept(pool, port): return True
        mlist = self.__ROOT.getchildren()
        m = False
        for p in mlist:
            if p.tag != 'port': continue
            if p.attrib['port'] == port:
                self.__ROOT.remove(p)
                m = True
        if m:
            self.Save()
            return True
        return False

    # 检查端口是否已放行
    def CheckPortAccept(self, pool, port):
        for p in self.GetAcceptPortList():
            if p['port'] == port and p['protocol']==pool: return True
        return False


    # 获取屏蔽IP列表
    def GetDropAddressList(self):
        mlist = self.__ROOT.getchildren()
        data = []
        for ip in mlist:

            if ip.tag != 'rule': continue
            tmp = {}
            ch = ip.getchildren()
            a=None
            for c in ch:
                tmp['type']=None
                if c.tag == 'drop': tmp['type'] = 'drop'
                if c.tag == 'source':

                    tmp['address']=c.attrib['address']
                if tmp['type']:
                    data.append(tmp)
        return data

    # 获取 reject 信息
    def GetrejectLIST(self):
        mlist = self.__ROOT.getchildren()
        data = []
        for ip in mlist:
            #print(ip)
            if ip.tag != 'rule': continue
            tmp = {}
            ch = ip.getchildren()
            a=None
            flag = None
            for c in ch:
                tmp['type']=None
                if c.tag == 'reject': tmp['type'] = 'reject'
                if c.tag == 'source':

                    tmp['address']=c.attrib['address']
                if c.tag =='port':

                    tmp['protocol']=c.attrib['protocol']
                    tmp['port']=c.attrib['port']
                if tmp['type']:
                    data.append(tmp)
        return data

# 获取 accept 信息

    def Getacceptlist(self):
        mlist = self.__ROOT.getchildren()
        data = []
        for ip in mlist:

            if ip.tag != 'rule': continue
            tmp = {}
            ch = ip.getchildren()
            a=None
            flag = None
            for c in ch:
                tmp['type']=None
                if c.tag == 'accept': tmp['type'] = 'accept'
                if c.tag == 'source':

                    tmp['address']=c.attrib['address']
                if c.tag =='port':
                    tmp['protocol']=c.attrib['protocol']
                    tmp['port']=c.attrib['port']
                if tmp['type']:
                    data.append(tmp)
        return data


# 获取所有信息
    def Get_All_Info(self):
        data={}
        data['accept_ip']=self.GetAcceptPortList()
        data['drop_ip']=self.GetDropAddressList()
        data['reject']=self.GetrejectLIST()
        data['accept']=self.Getacceptlist()
        return data

# 判断是否存在
    def Chekc_info(self,port,address,pool,type):
        data=self.Get_All_Info()
        if type=='accept':
            for i in data['accept']:
                #print(i['address'], i['protocol'], i['port'])
                if i['address']==address and i['protocol']==pool and i['port']==port:
                    return True
                else:
                    return False
        elif type=='reject':
            for i in data['accept']:
               # print(i['address'], i['protocol'], i['port'])
                if i['address'] == address and i['protocol'] == pool and i['port'] == port:
                    return True
                else:
                    return False
        else:
            return False

    def AddDropAddress(self, address):
        # 检查是否存在
        if self.CheckIpDrop(address): return True
        attr = {"family": 'ipv4'}
        rule = Element("rule", attr)
        attr = {"address": address}
        source = Element("source", attr)
        drop = Element("drop", {})
        rule.append(source)
        rule.append(drop)
        self.__ROOT.append(rule)
        self.Save()
        return 'OK'

    # 删除IP屏蔽
    def DelDropAddress(self, address):
        # 检查是否存在
        if not self.CheckIpDrop(address): return True
        mlist = self.__ROOT.getchildren()
        for ip in mlist:
            if ip.tag != 'rule': continue
            ch = ip.getchildren()
            for c in ch:

                if c.tag != 'source':continue
                if c.attrib['address'] == address:
                    self.__ROOT.remove(ip)
                    self.Save()
                    return True
        return False



# 添加端口放行并且指定IP
    def Add_Port_IP(self, port,address,pool,type):
        if type=='accept':
            # 判断是否存在
            if self.Chekc_info(port,address,pool,type): return True
            attr = {"family": 'ipv4'}
            rule = Element("rule", attr)
            attr = {"address": address}
            source = Element("source", attr)
            attr={'port':str(port),'protocol':pool}
            port_info=Element("port",attr)
            accept = Element("accept", {})
            rule.append(source)
            rule.append(port_info)
            rule.append(accept)
            self.__ROOT.append(rule)
            self.Save()
            return True

        elif type=='reject':
            # 判断是否存在
            if self.Chekc_info(port,address,pool,type):return True
            attr = {"family": 'ipv4'}
            rule = Element("rule", attr)
            attr = {"address": address}
            source = Element("source", attr)
            attr = {'port': str(port), 'protocol': pool}
            port_info = Element("port", attr)
            reject = Element("reject", {})
            rule.append(source)
            rule.append(port_info)
            rule.append(reject)
            self.__ROOT.append(rule)
            self.Save()
            return True
        else:
            return False


# 删除指定端口的=。=
    def Del_Port_IP(self, port,address,pool,type):
        if type=='accept':
            a = None
            for i in self.__ROOT:
                if i.tag == 'rule':
                    tmp = {}
                    for c in i.getchildren():
                        tmp['type'] = None
                        if c.tag == 'accept': tmp['type'] = 'accept'
                        if c.tag == 'source':
                            tmp['address'] = c.attrib['address']
                        if c.tag == 'port':
                            tmp['protocol'] = c.attrib['protocol']
                            tmp['port'] = c.attrib['port']
                        if tmp['type']:
                            if tmp['port'] == port and tmp['address'] == address and tmp['type'] == type and tmp['protocol'] == pool:
                                self.__ROOT.remove(i)
                        self.Save()
            return True

        elif type=='reject':
            for i in self.__ROOT:
                if i.tag == 'rule':
                    tmp = {}
                    for c in i.getchildren():
                        tmp['type'] = None
                        if c.tag == 'reject': tmp['type'] = 'reject'
                        if c.tag == 'source':
                            tmp['address'] = c.attrib['address']
                        if c.tag == 'port':
                            tmp['protocol'] = c.attrib['protocol']
                            tmp['port'] = c.attrib['port']
                        if tmp['type']:
                            if tmp['port'] == port and tmp['address'] == address and tmp['type'] == type and tmp['protocol'] == pool:
                                self.__ROOT.remove(i)

                                self.Save()
            return True

    # 检查IP是否已经屏蔽
    def CheckIpDrop(self, address):
        for ip in self.GetDropAddressList():
            if ip['address'] == address: return True
        return False

    # 取服务状态
    def GetServiceStatus(self):
        result = self.ExecShell("systemctl status firewalld|grep '(running)'")
        if len(result) > 10: return True
        return False

    # 服务控制
    def FirewalldService(self, type):
        os.system('systemctl ' + type + ' firewalld.service')
        return 'SUUESS'

    # 保存配置
    def Save(self):
        self.format(self.__ROOT)
        self.__TREE.write(self.__CONF_FILE, 'utf-8')
        os.system('firewall-cmd --reload')

    # 整理配置文件格式
    def format(self, em, level=0):
        i = "\n" + level * "  "
        if len(em):
            if not em.text or not em.text.strip():
                em.text = i + "  "
            for e in em:
                self.format(e, level + 1)
            if not e.tail or not e.tail.strip():
                e.tail = i
        if level and (not em.tail or not em.tail.strip()):
            em.tail = i


if __name__ == "__main__":
    try:
        import argparse
        p = firewalld()
        import sys
        import json
        data = None
        type = sys.argv[1]
        if type == 'list':
            print(p.Get_All_Info())
        elif type =='addtcp':
            p.AddAcceptPort(sys.argv[2])
        elif type=='deltcp':
            p.DelAcceptPort(sys.argv[2])
        elif type =='addudp':
            p.AddUpdPort(sys.argv[2])
        elif type=='deludp':
            p.DelUdpPort(sys.argv[2])
        elif type=='addip':
            p.AddDropAddress(sys.argv[2])
        elif type=='delip':
            p.DelDropAddress(sys.argv[2])
        elif type=='addtcpport':
            try:
                pool = ['tcp', 'udp']
                type = ['accept', 'reject']
                if sys.argv[2] > 65535 and sys.argv[4] not in pool and sys.argv[5] not in type:
                    print('addtcpport 选项如下:')
                    print('addtcpport port,address,pool,type ')
                    print('port 为端口')
                    print('address 为ip地址 例如: 192.168.10.1 192.168.10.0/24')
                    print('pool 为 tcp and udp')
                    print('type 为(accept,reject) accept为允许  accept为禁止')
                else:
                  p.Add_Port_IP(sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5])
            except:
                print('addtcpport 选项如下:')
                print('addtcpport port,address,pool,type ')
                print('port 为端口')
                print('address 为ip地址 例如: 192.168.10.1 192.168.10.0/24')
                print('pool 为 tcp and udp')
                print('type 为(accept,reject) accept为允许  accept为禁止')
        elif type=='delportip':
            try:
                pool=['tcp','udp']
                type=['accept','reject']
                if sys.argv[2]>65535 and  sys.argv[4] not in pool and sys.argv[5] not in type:
                    print('addtcpport 选项如下:')
                    print('addtcpport port,address,pool,type ')
                    print('port 为端口')
                    print('address 为ip地址 例如: 192.168.10.1 192.168.10.0/24')
                    print('pool 为 tcp and udp')
                    print('type 为(accept,reject) accept为允许  accept为禁止')
                else:
                    p.Del_Port_IP(sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5])
            except:
                print('addtcpport 选项如下:')
                print('addtcpport port,address,pool,type ')
                print('port 为端口')
                print('address 为ip地址 例如: 192.168.10.1 192.168.10.0/24')
                print('pool 为 tcp and udp')
                print('type 为(accept,reject) accept为允许  accept为禁止')

        else:
            print('添加IP管理')

            print('python firewalld.py list 查看所有防火墙选项')
            print('list 为查看所有的选项')
    except:
        print('1. list 查看所有')
        print('2. addtcp port  添加端口')
        print('3. deltcp port  删除端口')
        print('4. addudp port 添加udp端口')
        print('5. deludp port 删除udp端口')
        print('6. addip address 添加禁止IP')
        print('7. delip address 删除禁止IP')
        print('8. addtcpport port,address,pool,type  添加IP 端口的允许/禁止')
        print('9. delportip port,address,pool,type  添加IP 端口的允许/禁止')



    # 样例
    #
    # print(p.GetrejectLIST())
    # print(p.Getacceptlist())
    # #print p.Add_Port_IP(port='873',address='192.168.100.0/24',pool='tcp',type='accept')
    # #print p.Del_Port_IP(port='873', address='192.168.100.0/24', pool='tcp', type='reject')
    # #print(p.test('192.168.1.0/24'))
    # #print(len(p.Getacceptlist()))
    #

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注