根据wireshark分析可知,通过分析qq语音通话,发现采用的是dup协议(应该是叫这个)
检测数据包的标志020048即可
需要pypacp这个库,但是python3 的环境下,推荐pacp-ct
具体缺啥库就安装啥....

import psutil,netaddr,pcap,dpkt
import time
import platform
import binascii
 
if 'Windows' in platform.platform():
    import winreg as wr

FINGERPRINT = {
    'qq.exe':('udp','020048'),
    'tim.exe':('udp','020048')
}

IF_REG = r'SYSTEM\CurrentControlSet\Control\Network\{4d36e972-e325-11ce-bfc1-08002be10318}'

def get_interface_name(name):
    reg = wr.ConnectRegistry(None, wr.HKEY_LOCAL_MACHINE)
    reg_key = wr.OpenKey(reg, IF_REG)
    for i in range(wr.QueryInfoKey(reg_key)[0]):
        subkey_name = wr.EnumKey(reg_key, i)
        try:
            reg_subkey = wr.OpenKey(reg_key, subkey_name + r'\Connection')
            Name = wr.QueryValueEx(reg_subkey, 'Name')[0]
            wr.CloseKey(reg_subkey)
            if Name == name:
                return r'\Device\NPF_' + subkey_name
        except FileNotFoundError as e:
            pass
 
    return None

if 'Windows' in platform.platform():
    iface = get_interface_name('Router') # WLAN
else:
    iface = 'enp2s0'


def get_pid_sessions(is_show=True):
    sessions = {}
    for session in psutil.net_connections(kind="all"):
        if session.laddr and session.raddr:
            pid               = session.pid                                                                                                  #进程pid
            pid_name          = psutil.Process(int(pid)).name().lower()                                                                      #进程名
            pid_create_time   = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(psutil.Process(int(pid)).create_time()))                    #进程创建时间

            src_ip               = session.laddr.ip                                                                                          #源IP
            sport             = session.laddr.port                                                                                           #源端口
            des_ip               = session.raddr.ip                                                                                          #目的IP
            dport             = session.raddr.port                                                                                           #目的端口
            status            = session.status                                                                                               #会话状态

            filter_des_ip        = netaddr.IPAddress(des_ip)                                                                                             #格式化目的IP地址,用于后续判断是否是公网IP
            filter_src_ip        = netaddr.IPAddress(src_ip)
            if not filter_des_ip.is_private() and filter_des_ip.is_unicast() and not filter_des_ip.is_link_local() and not filter_src_ip.is_loopback():        #判断目的IP不是私网IP,是一个单播地址,不是linklocal地址,如果符合条件就记录  
                if pid_name not in sessions:
                    sessions[pid_name] = {}
                if pid not in sessions[pid_name]:
                    sessions[pid_name][pid] = {}
                if f'{src_ip}:{sport}' not in sessions[pid_name][pid]:
                    sessions[pid_name][pid][f'{src_ip}:{sport}'] = {}
                if f'{des_ip}:{dport}' not in sessions[pid_name][pid][f'{src_ip}:{sport}']:
                    sessions[pid_name][pid][f'{src_ip}:{sport}'][f'{des_ip}:{dport}'] = {
                        'create_time':'',
                        'status':''
                    }
                sessions[pid_name][pid][f'{src_ip}:{sport}'][f'{des_ip}:{dport}']['status'] = status
                sessions[pid_name][pid][f'{src_ip}:{sport}'][f'{des_ip}:{dport}']['create_time'] = pid_create_time
                if is_show:
                    print(f'[{pid_create_time}]  [{pid}]    [{pid_name}]   [{src_ip}:{sport}] ---> [{des_ip}:{dport}] ---- [{status}]')
    return sessions

def sniffer(pname):
    transport   = FINGERPRINT[pname][0]
    fingerprint = FINGERPRINT[pname][1]
    pc = pcap.pcap(iface, promisc=True, immediate=True)
    pc.setfilter(transport)
    if transport.lower() == 'tcp':
        check_transport = dpkt.tcp.TCP
    else:
        check_transport = dpkt.udp.UDP
    for timestamp, raw_buf in pc:
        eth = dpkt.ethernet.Ethernet(raw_buf)
        
        if not isinstance(eth.data, dpkt.ip.IP):
            continue
        df = bool(eth.data.off & dpkt.ip.IP_DF)
        mf = bool(eth.data.off & dpkt.ip.IP_MF)
        offset = eth.data.off & dpkt.ip.IP_OFFMASK
        traffic = eth.data.data
        if not isinstance(traffic,check_transport):
            continue
        if not len(traffic.data):
            continue
        packet =  {
            'time':time.strftime('%Y-%m-%d %H:%M:%S',(time.localtime(timestamp))),
            'src':'%d.%d.%d.%d'%tuple(eth.data.src), 
            'dst':'%d.%d.%d.%d'%tuple(eth.data.dst),
            'protocol':eth.data.p, 
            'len':eth.data.len, 
            'ttl':eth.data.ttl,
            'df':df, 
            'mf':mf, 
            'offset':offset, 
            'checksum':eth.data.sum,
            'traffic':binascii.b2a_hex(traffic.data).decode()
        }
        if packet['traffic'].startswith(fingerprint):
            print()
            for key in packet:
                length = len(key)
                ins_space_left  = ''
                ins_space_right = ''
                if length < 8:
                    ins_space_left  = ' ' * int((8-length)/2)
                    if length % 2:
                        ins_space_right = ' '* int((8-length)/2+1)
                    else:
                        ins_space_right = ' '* int((8-length)/2)
                print(f'[ {pname} ] [ {ins_space_left}{key}{ins_space_right} ] ----> {packet[key]}')
        else:
            print('\r[%s]  No capture!' %time.strftime('%Y-%m-%d %H:%M:%S',(time.localtime(time.time()))),end='')

if __name__ == "__main__":
    sessions = get_pid_sessions(False)
    pid_names = FINGERPRINT.keys()
    for pname in pid_names:
        if pname in sessions:
            sniffer(pname=pname)
            break

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=rmvtlnssray0