python多任务的一些bug整理

bug记录

Posted by UlyC on June 10, 2018

世界上只有两句真理:1、人一定会死。2、程序一定有Bug。.——《程序员励志名言》

python进程中可能会遇到的bug

1. 子进程中不能有input()函数

import multiprocessing


def test():
    input()


def main():
    multiprocessing.Process(target=test).start()    # subprocess


if __name__ == '__main__':
    main()

会报错:EOFError: EOF when reading a line 经试验,子进程中只有input()是不能用的,而其他阻塞式函数可以正常使用。

2. 进程池要在需要加入进程池的函数定义之后创建

import multiprocessing
import time
q = multiprocessing.Manager().Queue()
p = multiprocessing.Pool()


def write(q):
    for i in range(100):
        q.put(i)
        print("writing~")
        time.sleep(2)


def read(q):
    while 1:
        print("reading~~",q.get())
        time.sleep(2)


def main():
    p.apply_async(write, args=(q,))
    p.apply_async(read, args=(q,))

    p.close()
    p.join()


if __name__ == '__main__':
    main()

创建一个往队列里写入数据的函数,一个读取数据的函数,放入进程池中。 当进程池q创建在函数被定义之前时,程序就会报错, AttributeError: Can't get attribute 'write' on <module '__main__' from

按常理来说,函数被定义时并不会执行,在程序中被定义的位置也不影响程序。然而将 p = multiprocessing.Pool()
此句进程池的创建移动到 write 函数之后,read函数之前:

import multiprocessing
import time
q = multiprocessing.Manager().Queue()


def write(q):
    for i in range(100):
        q.put(i)
        print("writing~")
        time.sleep(2)


p = multiprocessing.Pool() # 移动到write函数之后,就不会报获取不到属性write


def read(q):
    while 1:
        print("reading~~",q.get())
        time.sleep(2)


def main():
    p.apply_async(write, args=(q,))
    p.apply_async(read, args=(q,))

    p.close()
    p.join()


if __name__ == '__main__':
    main()

此时会报错无法获取属性read。

而当移动到write和read函数之后main函数之前或之中就不会报错。应该也是一个语言本身的bug。

建议以后使用进程池时,都在主函数中创建进程池。

3.将函数加入进程池,传给函数的参数是进程队列时,会直接结束程序

来看例子:

import multiprocessing
import time
q = multiprocessing.Queue()


def write(q):
    for i in range(100):
        q.put(i)
        print("writing~")
        time.sleep(2)


def read(q):
    while 1:
        print("reading~~",q.get())
        time.sleep(2)


p = multiprocessing.Pool()  # 这里是为了演示此时不会报错,建议以后此步放在main函数中


def main():
    p.apply_async(write, args=(q,))  # 传入的是进程队列
    p.apply_async(read, args=(q,))

    p.close()
    p.join()


if __name__ == '__main__':
    main()

运行此程序,会直接结束程序,而传入进程池队列就会正常执行。

建议使用进程池时,都要使用进程池队列。

4.队列中的put_nowait与get_nowait

import  multiprocessing


def main():
    q = multiprocessing.Queue(3)
    q.put_nowait(1234)
    q.put_nowait(1234)
    q.put_nowait(1234)

    print(q.get_nowait())


if __name__ == '__main__':
    main()

此时运行会报错 queue.Empty,队列为空。 是因为放的速度比较慢,有可能取的时候,还未将数据放入队列。

那我们来看下运行完最后一个put_nowait之后队列是空还是满:

import  multiprocessing


def main():
    q = multiprocessing.Queue(3)
    q.put_nowait(1234)
    q.put_nowait(1234)
    q.put_nowait(1234)
    print(q.full())
    print(q.empty()


if __name__ == '__main__':
    main()

神奇的是结果为两个True,即:

  1. 队列是满的 == True
  2. 队列是空的 == True

这就又是一个bug了,

将代码修改如下:

import  multiprocessing
import time


def main():
    q = multiprocessing.Queue(3)
    q.put_nowait(1234)
    q.put_nowait(1234)
    q.put_nowait(1234)
	time.sleep(1)  # 取和判定之前等待1s

	print(q.full())
    print(q.empty()

    print(q.get_nowait())


if __name__ == '__main__':
    main()

结果为:

True 
False 
1234

可以看到程序正常了。 只需取之前等待一下,即可正常取出,但是等待的时长并不是一个确定值,不同的数据等待的时长不一定,所以以后尽量避免使用put_nowait和get_nowait。

5.程序单任务正常运行,使用gevent开协程会无法进入调用的函数

# code = utf-8
import re
from socket import*
import gevent
from gevent import monkey


monkey.patch_all()


def http_exe(soc_c):
    print("http_exe is running")

    req = soc_c.recv(1024).decode("utf-8")
    print(req)

    if not req:
        soc_c.close()
        return

    res = re.match("[^ ]+ (/.*) ", req)
    if res:
        url = res.group(1)

    else:
        url = "/"

    try:
        with open("./html%s" % url, "rb") as bilibili:
            content = bilibili.read()

    except Exception as eb:
        with open("./html/index.html", "rb") as bilibili:
            content = bilibili.read()

    head = "HTTP/1.1 200 OK\r\n"
    soc_c.send((head + "\r\n").encode("utf-8"))
    soc_c.send(content)
    soc_c.close()


def main():
    http_serv = socket(AF_INET, SOCK_STREAM)
    addrs = ("127.0.0.1", 1234)
    http_serv.bind(addrs)
    http_serv.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    http_serv.listen(128)   # TCPServer init

    try:
        while True:
            soc_c, addrc = http_serv.accept()
            print("Accept success!")
            gevent.spawn(http_exe, soc_c)

    except Exception as erro:
        print(erro)
        http_serv.close()


if __name__ == '__main__':
    main()

问题出在from socket import*,使用import socket导入则问题解决。


知识共享许可协议
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。