首页
仓库
文档
nginx手册
Docker手册
workerman
Flask
PHP
python
RabbitMQ
其他
Linux
占位1
占位2
目录
###main.py 主程序 import threading from worker1 import worker1 from worker2 import worker2 from worker3 import worker3 from worker4 import worker4 # 创建并启动线程 def on_start_threads(): threads = [] # 创建第一个线程 t1 = threading.Thread(target=worker1, name="定时器") t1.start() threads.append(t1) # 创建第二个线程 t2 = threading.Thread(target=worker2, name="http",kwargs={'db':123,'count':100}) t2.start() threads.append(t2) # # 创建第三个线程 t3 = threading.Thread(target=worker3, name="TCP_SERVER") t3.start() threads.append(t3) # # 创建第四个线程 t4 = threading.Thread(target=worker4, name="tcp_client") t4.start() threads.append(t4) print("线程数量 = %d" % threading.active_count()) # 等待所有线程完成 for t in threads: t.join() #t.join() 等待子线程执行完 # 主程序入口 if __name__ == "__main__": on_start_threads() ###worker1.py 定时执行 import threading import fun # 定时器线程 interval=5 # 每N秒执行一次 start_count=0 #执行次数 debug_show=False #显示调试国产 log_save=True #保存日志 def worker1(): print("定时器线程开始执行") timer = threading.Timer(interval , hello, kwargs={'name':'定时器','val':'123'}) timer.start() def hello(name,val): global start_count start_count=start_count+1 fun.log_('定时器:%s次'%start_count, 'dingshiqi', debug_show, log_save) timer = threading.Timer(interval , hello, kwargs={'name':'定时器','val':'123'}) timer.start() if(start_count>=200): timer.cancel() fun.log_('取消定时器' % start_count, 'dingshiqi', debug_show, log_save) ###worker2.py HTTP请求 import threading import time import requests import fun debug_show=False #显示调试国产 log_save=True #保存日志 # 定时请求url def worker2(db,count): print("http线程开始执行") while(count>0): fun.log_('work2--->%s'%count, 'http', debug_show, log_save) time.sleep(5) t3 = threading.Thread(target=get_http, name="get_http",kwargs={'name':count}) t3.start() count=count-1 def get_http(name): x = requests.get('https://www.tolog.cn/test-bak/time.php', timeout=1) fun.log_('name:%s,status_code:%s,text:%s' %(name,x.status_code,x.text), 'http', debug_show, log_save) x.close() #关闭与服务器的连接 ###worker3.py TCP服务端 import datetime import queue #创建进程 import socket # 使用的socket接口 import threading import requests import fun debug_show=False #显示调试国产 log_save=True #保存日志 Queue_list =queue.Queue() #先进先出队列 # 启动一个tcp 服务器接受TCP报文 def worker3(): global Queue_list print("TCP_server线程开始执行") HOST = '127.0.0.1' # 本地地址和端口 PORT = 50000 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建socket sock.bind((HOST, PORT)) # 绑定到本地IP和端口 sock.listen(10) # 等待用户请求连接 #队列推送 thread = threading.Thread(target=ts_list,kwargs={ 'queue': Queue_list}) thread.start() while True: # 一直接收用户数据并原封不动地返回 client, address = sock.accept() # 接收用户连接请求 fun.log_("\nTCP客户端" + str(address), 'TCP_SERVER', debug_show, log_save) Queue_list.put(('new_client', str(address[0]) + ':' + str(address[1]))) thread = threading.Thread(target=tcp_server_client,kwargs={'client': client, 'address': address,'queue':Queue_list}) thread.start() fun.log_("tcp_server服务器关闭" + str(address), 'TCP_SERVER', debug_show, log_save) sock.close() # 关闭连接请求socket #客户端处理 def tcp_server_client(client, address,queue): fun.log_("TCP客户端子线程" + str(address), 'TCP_SERVER', debug_show, log_save) try: client.settimeout(10)# 设置超时时间 while True: # 接收数据的大小 data = client.recv(100) if not data: fun.log_("用户主动断开" + str(address), 'TCP_SERVER',debug_show,log_save) if data == b"": break if (len(data) > 0): # print(data) s_data =data.decode('gb2312') # 如果客户端发送 字符串或者JSON {"ABC":123,"bbb":888} #print('收到:'+s_data[16:]) #s_data =data.hex() #转十六进制 #如果客户端 发送16进制数据 用这个 #print("收到"+str(s_data)) fun.log_("connect by"+str(address)+str(s_data),'TCP_SERVER',debug_show,log_save) resdata='res:'+s_data fun.log_("send" + str(resdata) , 'TCP_SERVER',debug_show,log_save) # 将接收到的信息原样的返回到客户端中 client.send(resdata.encode("gb2312")) queue.put(('insert',str(address[0])+':'+str(address[1]),s_data,resdata,str(datetime.datetime.today()))) except socket.timeout: # 超时后显示退出 fun.log_("time out" + str(address) , 'TCP_SERVER',debug_show,log_save) except ConnectionResetError: fun.log_("客户端正常断开连接" + str(address), 'TCP_SERVER',debug_show,log_save) except ConnectionAbortedError: fun.log_("用户主动断开" + str(address), 'TCP_SERVER',debug_show,log_save) finally: client.close() # 关闭与客户端的连接 queue.put(('remove_client', str(address[0]) + ':' + str(address[1]))) #队里处理 def ts_list(queue): while(True): item = queue.get() # print(f'Working on {item}') # print(f'Finished {item}') if item[0]=='insert': thread = threading.Thread(target=send_http, kwargs={'str': str(item)}) thread.start() queue.task_done() def send_http(str): x = requests.get('https://www.tolog.cn/test-bak/time2.php?str='+str, timeout=1) fun.log_('name:%s,status_code:%s,text:%s' %(str,x.status_code,x.text), 'send_http', debug_show, log_save) x.close() #关闭与服务器的连接 ###worker4.py 模拟TCP客户端 import threading import random import string import socket import random import fun # 定置执行tcp客户端向TCP服务器发送请求 debug_show=False #显示调试国产 log_save=True #保存日志 interval=3 # 每N秒执行一次 start_count=0 def worker4(): print("TCP客户端 定时器线程开始执行") timer = threading.Timer(interval , tcp_client) timer.start() def tcp_client(): global start_count start_count=start_count+1 fun.log_('\n发送TCP请求:%s次'%start_count, 'TCP_CLIENT', debug_show, log_save) random_num = random.randrange(1,10) while(random_num>1): random_num=random_num-1 HOST = '127.0.0.1' # 服务器地址 PORT = 50000 # 服务器端口 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) # 连接服务器 data = 'Hello, world'+generate_random_string(4) fun.log_('tcp_client_send: %s' % data, 'TCP_CLIENT', debug_show, log_save) b_data = data.encode("utf-8") sock.send(b_data) # 发送数据 try: sock.settimeout(10) # 设置超时时间 data = sock.recv(50) # 接收回应 fun.log_('tcp_server_Received: %s'%data.decode('gb2312'), 'TCP_CLIENT', debug_show, log_save) except socket.timeout: # 超时后显示退出 fun.log_('time out', 'TCP_CLIENT', debug_show, log_save) except ConnectionResetError: fun.log_('正常断开连接', 'TCP_CLIENT', debug_show, log_save) except ConnectionAbortedError: fun.log_('主动断开', 'TCP_CLIENT', debug_show, log_save) finally: sock.close() # 关闭连接,释放资源 timer = threading.Timer(interval , tcp_client) timer.start() if(start_count>=200000): timer.cancel() print('取消定时器') #随即字符串 def generate_random_string(length): characters = string.ascii_letters + string.digits random_string = ''.join(random.choice(characters) for _ in range(length)) return random_string ###fun.py import datetime #存入内容,#文件名称,#是否打印,#是否保存 def log_( str, file='log_',show=True,save=False): if(show==True): print(file+':'+str) if(save==True): now = datetime.datetime.now() fd = open('./log/'+file + now.strftime("%Y-%m-%d") + ".txt", "a", encoding="utf-8") dt = datetime.datetime.now() fd.write(str + ' | ' + dt.strftime("%Y-%m-%d %H:%M:%S") + "\n") fd.close()