亲宝软件园·资讯

展开

go语言 http模型reactor示例详解

不爱学习的王小小 人气:0

前面说了go自带的原生netpoll模型,大致的流程就是每一个新的连接都会开启一个goroutine去处理,这样的处理的过程简单,高效,充分利用了go的底层的能力。
但是这里有几个问题,对于accept的时候,是否可以多个线程去accept,这样的话就不用每次有一个连接就开启一个线程。
同时看过accept的源码都知道,只会一个线程去accpet连接,因为这个套接字在创建的时候就被设置成了非阻塞,所以会变goruntime调用gopark挂起。
开启端口复用也就是SO_REUSEPORT功能。这样一方面可以避免惊群效应
接下来看一下一个demo,这里使用的gnet框架,github地址

示例

接下来看一段基于reactor的示例。这里运行通过 go run main.go.
然后curl -i 127.0.0.1:8080.效果如下,也是返回了我们期望的结果

在这里插入图片描述

package main

import (
	"flag"
	"fmt"
	"log"
	"strconv"
	"strings"
	"time"
	"unsafe"

	"learn/http/gnet"
)

var res string

type request struct {
	proto, method string
	path, query   string
	head, body    string
	remoteAddr    string
}

type httpServer struct {
	*gnet.EventServer
}

var (
	errMsg      = "Internal Server Error"
	errMsgBytes = []byte(errMsg)
)

type httpCodec struct {
	req request
}

func (hc *httpCodec) Encode(c gnet.Conn, buf []byte) (out []byte, err error) {
	if c.Context() == nil {
		return buf, nil
	}
	return appendResp(out, "500 Error", "", errMsg+"\n"), nil
}

func (hc *httpCodec) Decode(c gnet.Conn) (out []byte, err error) {
	buf := c.Read()
	c.ResetBuffer()

	// process the pipeline
	var leftover []byte
pipeline:
	leftover, err = parseReq(buf, &hc.req)
	// bad thing happened
	if err != nil {
		c.SetContext(err)
		return nil, err
	} else if len(leftover) == len(buf) {
		// request not ready, yet
		return
	}
	out = appendHandle(out, res)

	buf = leftover
	goto pipeline
}

func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
	//log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n",
	//	srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
	return
}

func (hs *httpServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
	if c.Context() != nil {
		// bad thing happened
		out = errMsgBytes
		action = gnet.Close
		return
	}
	// handle the request
	out = frame
	return
}

func main() {
	var port int
	var multicore bool

	// Example command: go run http.go --port 8080 --multicore=true
	flag.IntVar(&port, "port", 8888, "server port")
	flag.BoolVar(&multicore, "multicore", true, "multicore")
	flag.Parse()

	res = "Hello World!\r\n"

	http := new(httpServer)
	hc := new(httpCodec)

	// Start serving!
	log.Fatal(gnet.Serve(http, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithCodec(hc), gnet.WithNumEventLoop(3), gnet.WithReusePort(true)))
}

// appendHandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendHandle(b []byte, res string) []byte {
	return appendResp(b, "200 OK", "", res)
}

// appendResp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendResp(b []byte, status, head, body string) []byte {
	b = append(b, "HTTP/1.1"...)
	b = append(b, ' ')
	b = append(b, status...)
	b = append(b, '\r', '\n')
	b = append(b, "Server: gnet\r\n"...)
	b = append(b, "Date: "...)
	b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, "Content-Length: "...)
		b = strconv.AppendInt(b, int64(len(body)), 10)
		b = append(b, '\r', '\n')
	}
	b = append(b, head...)
	b = append(b, '\r', '\n')
	if len(body) > 0 {
		b = append(b, body...)
	}
	return b
}

func b2s(b []byte) string {
	return *(*string)(unsafe.Pointer(&b))
}

func parseReq(data []byte, req *request) (leftover []byte, err error) {
	sdata := b2s(data)
	var i, s int
	var head string
	var clen int
	q := -1
	// method, path, proto line
	for ; i < len(sdata); i++ {
		if sdata[i] == ' ' {
			req.method = sdata[s:i]
			for i, s = i+1, i+1; i < len(sdata); i++ {
				if sdata[i] == '?' && q == -1 {
					q = i - s
				} else if sdata[i] == ' ' {
					if q != -1 {
						req.path = sdata[s:q]
						req.query = req.path[q+1 : i]
					} else {
						req.path = sdata[s:i]
					}
					for i, s = i+1, i+1; i < len(sdata); i++ {
						if sdata[i] == '\n' && sdata[i-1] == '\r' {
							req.proto = sdata[s:i]
							i, s = i+1, i+1
							break
						}
					}
					break
				}
			}
			break
		}
	}
	if req.proto == "" {
		return data, fmt.Errorf("malformed request")
	}
	head = sdata[:s]
	for ; i < len(sdata); i++ {
		if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
			line := sdata[s : i-1]
			s = i + 1
			if line == "" {
				req.head = sdata[len(head)+2 : i+1]
				i++
				if clen > 0 {
					if len(sdata[i:]) < clen {
						break
					}
					req.body = sdata[i : i+clen]
					i += clen
				}
				return data[i:], nil
			}
			if strings.HasPrefix(line, "Content-Length:") {
				n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
				if err == nil {
					clen = int(n)
				}
			}
		}
	}
	// not enough data
	return data, nil
}

看一下这个源码解析,还是先从gnet.Serve看起来

gnet.Serve

// Serve starts handling events for the specified address.
//
// Address should use a scheme prefix and be formatted
// like `tcp://192.168.0.10:9851` or `unix://socket`.
// Valid network schemes:
//  tcp   - bind to both IPv4 and IPv6
//  tcp4  - IPv4
//  tcp6  - IPv6
//  udp   - bind to both IPv4 and IPv6
//  udp4  - IPv4
//  udp6  - IPv6
//  unix  - Unix Domain Socket
//
// The "tcp" network scheme is assumed when one is not specified.
func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err error) {
    // 加载用户指定的配置
	options := loadOptions(opts...)

	logging.Debugf("default logging level is %s", logging.LogLevel())

	var (
		logger logging.Logger
		flush  func() error
	)
	if options.LogPath != "" {
		if logger, flush, err = logging.CreateLoggerAsLocalFile(options.LogPath, options.LogLevel); err != nil {
			return
		}
	} else {
		logger = logging.GetDefaultLogger()
	}
	if options.Logger == nil {
		options.Logger = logger
	}
	defer func() {
		if flush != nil {
			_ = flush()
		}
		logging.Cleanup()
	}()

	// The maximum number of operating system threads that the Go program can use is initially set to 10000,
	// which should also be the maximum amount of I/O event-loops locked to OS threads that users can start up.
	// 为了防止线程过多
	if options.LockOSThread && options.NumEventLoop > 10000 {
		logging.Errorf("too many event-loops under LockOSThread mode, should be less than 10,000 "+
			"while you are trying to set up %d\n", options.NumEventLoop)
		return errors.ErrTooManyEventLoopThreads
	}

	if rbc := options.ReadBufferCap; rbc <= 0 {
		options.ReadBufferCap = 0x10000
	} else {
		options.ReadBufferCap = internal.CeilToPowerOfTwo(rbc)
	}
	// 解析addr
	network, addr := parseProtoAddr(protoAddr)

    // 初始化listener
	var ln *listener
	if ln, err = initListener(network, addr, options); err != nil {
		return
	}
	defer ln.close()

	return serve(eventHandler, ln, options, protoAddr)
}

可以看出来参数是EventHandler 这样的interface

type (
	// EventHandler represents the server events' callbacks for the Serve call.
	// Each event has an Action return value that is used manage the state
	// of the connection and server.
	EventHandler interface {
		// OnInitComplete fires when the server is ready for accepting connections.
		// The parameter:server has information and various utilities.
		OnInitComplete(server Server) (action Action)

		// OnShutdown fires when the server is being shut down, it is called right after
		// all event-loops and connections are closed.
		OnShutdown(server Server)

		// OnOpened fires when a new connection has been opened.
		// The parameter:c has information about the connection such as it's local and remote address.
		// Parameter:out is the return value which is going to be sent back to the client.
		// It is generally not recommended to send large amounts of data back to the client in OnOpened.
		//
		// Note that the bytes returned by OnOpened will be sent back to client without being encoded.
		OnOpened(c Conn) (out []byte, action Action)

		// OnClosed fires when a connection has been closed.
		// The parameter:err is the last known connection error.
		OnClosed(c Conn, err error) (action Action)

		// PreWrite fires just before any data is written to any client socket, this event function is usually used to
		// put some code of logging/counting/reporting or any prepositive operations before writing data to client.
		PreWrite()

		// React fires when a connection sends the server data.
		// Call c.Read() or c.ReadN(n) within the parameter:c to read incoming data from client.
		// Parameter:out is the return value which is going to be sent back to the client.
		React(frame []byte, c Conn) (out []byte, action Action)

		// Tick fires immediately after the server starts and will fire again
		// following the duration specified by the delay return value.
		Tick() (delay time.Duration, action Action)
	}

	// EventServer is a built-in implementation of EventHandler which sets up each method with a default implementation,
	// you can compose it with your own implementation of EventHandler when you don't want to implement all methods
	// in EventHandler.
	EventServer struct{}
)

initListener

然后看一下初始化监听

func initListener(network, addr string, options *Options) (l *listener, err error) {
	var sockopts []socket.Option
	// 判断是否开启重复使用端口
	if options.ReusePort || strings.HasPrefix(network, "udp") {
		sockopt := socket.Option{SetSockopt: socket.SetReuseport, Opt: 1}
		sockopts = append(sockopts, sockopt)
	}
 
    // 是否开启nagle算法 默认是关闭
	if options.TCPNoDelay == TCPNoDelay && strings.HasPrefix(network, "tcp") {
		sockopt := socket.Option{SetSockopt: socket.SetNoDelay, Opt: 1}
		sockopts = append(sockopts, sockopt)
	}
  
    // 设置socket的recv buffer
	if options.SocketRecvBuffer > 0 {
		sockopt := socket.Option{SetSockopt: socket.SetRecvBuffer, Opt: options.SocketRecvBuffer}
		sockopts = append(sockopts, sockopt)
	}
   
    // 设置socket的send buffer
	if options.SocketSendBuffer > 0 {
		sockopt := socket.Option{SetSockopt: socket.SetSendBuffer, Opt: options.SocketSendBuffer}
		sockopts = append(sockopts, sockopt)
	}
	l = &listener{network: network, addr: addr, sockopts: sockopts}
	
	err = l.normalize()
	return
}

normalize最后调用的是tcpSocket方法。

// tcpSocket creates an endpoint for communication and returns a file descriptor that refers to that endpoint.
// Argument `reusePort` indicates whether the SO_REUSEPORT flag will be assigned.
func tcpSocket(proto, addr string, sockopts ...Option) (fd int, netAddr net.Addr, err error) {
	var (
		family   int
		ipv6only bool
		sockaddr unix.Sockaddr
	)
    // 获取地址
	if sockaddr, family, netAddr, ipv6only, err = getTCPSockaddr(proto, addr); err != nil {
		return
	}

    // 调用 底层的socket方法
    // 调用 unix.Socket(family, sotype|unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC, proto) 
	if fd, err = sysSocket(family, unix.SOCK_STREAM, unix.IPPROTO_TCP); err != nil {
		err = os.NewSyscallError("socket", err)
		return
	}
	defer func() {
		if err != nil {
			_ = unix.Close(fd)
		}
	}()

	if family == unix.AF_INET6 && ipv6only {
		if err = SetIPv6Only(fd, 1); err != nil {
			return
		}
	}
  
    // 添加率socket的一些自定义参数
	for _, sockopt := range sockopts {
		if err = sockopt.SetSockopt(fd, sockopt.Opt); err != nil {
			return
		}
	}
  
    // bind
	if err = os.NewSyscallError("bind", unix.Bind(fd, sockaddr)); err != nil {
		return
	}
 
    // 设置半连接数量的最大值
	// Set backlog size to the maximum.
	err = os.NewSyscallError("listen", unix.Listen(fd, listenerBacklogMaxSize))

	return
}

serve

func serve(eventHandler EventHandler, listener *listener, options *Options, protoAddr string) error {
	// Figure out the proper number of event-loops/goroutines to run.
	numEventLoop := 1
	if options.Multicore {
		numEventLoop = runtime.NumCPU()
	}
	if options.NumEventLoop > 0 {
		numEventLoop = options.NumEventLoop
	}
    // 实例化server
	svr := new(server)
	svr.opts = options
	svr.eventHandler = eventHandler
	svr.ln = listener

    // 判断选择的轮训方式 默认是RoundRobin
	switch options.LB {
	case RoundRobin:
		svr.lb = new(roundRobinLoadBalancer)
	case LeastConnections:
		svr.lb = new(leastConnectionsLoadBalancer)
	case SourceAddrHash:
		svr.lb = new(sourceAddrHashLoadBalancer)
	}

	svr.cond = sync.NewCond(&sync.Mutex{})
	if svr.opts.Ticker {
		svr.tickerCtx, svr.cancelTicker = context.WithCancel(context.Background())
	}
	svr.codec = func() ICodec {
		if options.Codec == nil {
			return new(BuiltInFrameCodec)
		}
		return options.Codec
	}()

	server := Server{
		svr:          svr,
		Multicore:    options.Multicore,
		Addr:         listener.lnaddr,
		NumEventLoop: numEventLoop,
		ReusePort:    options.ReusePort,
		TCPKeepAlive: options.TCPKeepAlive,
	}
	switch svr.eventHandler.OnInitComplete(server) {
	case None:
	case Shutdown:
		return nil
	}
	// 开启svr的start
	if err := svr.start(numEventLoop); err != nil {
		svr.closeEventLoops()
		svr.opts.Logger.Errorf("gnet server is stopping with error: %v", err)
		return err
	}
	defer svr.stop(server)

	allServers.Store(protoAddr, svr)

	return nil
}

func (svr *server) start(numEventLoop int) error {
	if svr.opts.ReusePort || svr.ln.network == "udp" {
	    // 启动eventLoops的事件循环
		return svr.activateEventLoops(numEventLoop)
	}

	return svr.activateReactors(numEventLoop)
}

然后看一下activateEventLoops方法。

activateEventLoops

func (svr *server) activateEventLoops(numEventLoop int) (err error) {
	var striker *eventloop
	// Create loops locally and bind the listeners.
	for i := 0; i < numEventLoop; i++ {
		ln := svr.ln
		if i > 0 && (svr.opts.ReusePort || ln.network == "udp") {
		    // 再次调用initListener这个方法 生成新的socket
			if ln, err = initListener(svr.ln.network, svr.ln.addr, svr.opts); err != nil {
				return
			}
		}

		var p *netpoll.Poller
		if p, err = netpoll.OpenPoller(); err == nil {
		    // 实例化eventloop
			el := new(eventloop)
			el.ln = ln
			el.svr = svr
			el.poller = p
			el.buffer = make([]byte, svr.opts.ReadBufferCap)
			el.connections = make(map[int]*conn)
			el.eventHandler = svr.eventHandler
			// 添加监听的套接字
			// 注意这里的loopAccept是一个回调函数
			_ = el.poller.AddRead(el.ln.packPollAttachment(el.loopAccept))
			// 注册
			svr.lb.register(el)

			// Start the ticker.
			if el.idx == 0 && svr.opts.Ticker {
				striker = el
			}
		} else {
			return
		}
	}

	// Start event-loops in background.
	svr.startEventLoops()

	go striker.loopTicker(svr.tickerCtx)

	return
}

然后 看一下 OpenPoller方法

// OpenPoller instantiates a poller.
func OpenPoller() (poller *Poller, err error) {
    // 创建poller实例
	poller = new(Poller)
  
    // 调用 epoll_create
	if poller.fd, err = unix.EpollCreate1(unix.EPOLL_CLOEXEC); err != nil {
		poller = nil
		err = os.NewSyscallError("epoll_create1", err)
		return
	}
 
    // 创建eventfd用来唤醒epoll
	if poller.wfd, err = unix.Eventfd(0, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC); err != nil {
		_ = poller.Close()
		poller = nil
		err = os.NewSyscallError("eventfd", err)
		return
	}
	poller.wfdBuf = make([]byte, 8)
 
    // eventfd加入到监听中
	if err = poller.AddRead(&PollAttachment{FD: poller.wfd}); err != nil {
		_ = poller.Close()
		poller = nil
		return
	}
	// 实例化asyncTaskQueue和priorAsyncTaskQueue
	poller.asyncTaskQueue = queue.NewLockFreeQueue()
	poller.priorAsyncTaskQueue = queue.NewLockFreeQueue()
	return
}

然后看一下loopAccept 这个方法

func (el *eventloop) loopAccept(_ netpoll.IOEvent) error {
	if el.ln.network == "udp" {
		return el.loopReadUDP(el.ln.fd)
	}
    // 因为前面在initListener这里只运行了bind方法 所以这里accept
	nfd, sa, err := unix.Accept(el.ln.fd)
	if err != nil {
		if err == unix.EAGAIN {
			return nil
		}
		el.getLogger().Errorf("Accept() fails due to error: %v", err)
		return os.NewSyscallError("accept", err)
	}
	// 获取到了以后设置为非阻塞
	if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil {
		return err
	}

	netAddr := socket.SockaddrToTCPOrUnixAddr(sa)
	if el.svr.opts.TCPKeepAlive > 0 && el.svr.ln.network == "tcp" {
		err = socket.SetKeepAlive(nfd, int(el.svr.opts.TCPKeepAlive/time.Second))
		logging.LogErr(err)
	}
  
    // 根据套接字实例化连接
	c := newTCPConn(nfd, el, sa, netAddr)
	// 在epoll中添加监听
	if err = el.poller.AddRead(c.pollAttachment); err == nil {
		el.connections[c.fd] = c
		return el.loopOpen(c)
	}
	return err
}

然后看一下 startEventLoops 这个方法

func (svr *server) startEventLoops() {
    // iterate 就是运行下面的方法
	svr.lb.iterate(func(i int, el *eventloop) bool {
		svr.wg.Add(1)
		go func() {
		    // 调用loopRun
			el.loopRun(svr.opts.LockOSThread)
			svr.wg.Done()
		}()
		return true
	})
}

func (el *eventloop) loopRun(lockOSThread bool) {
	if lockOSThread {
		runtime.LockOSThread()
		defer runtime.UnlockOSThread()
	}

	defer func() {
		el.closeAllConns()
		el.ln.close()
		el.svr.signalShutdown()
	}()
	// 调用Polling 注意这里Polling里面传的是一个方法
	err := el.poller.Polling(func(fd int, ev uint32) (err error) {
	    // 注意里面这个连接有事件发生的时候
		if c, ok := el.connections[fd]; ok {
			// Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100%
			// sure what you're doing!
			// Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past.

			// We should always check for the EPOLLOUT event first, as we must try to send the leftover data back to
			// client when any error occurs on a connection.
			//
			// Either an EPOLLOUT or EPOLLERR event may be fired when a connection is refused.
			// In either case loopWrite() should take care of it properly:
			// 1) writing data back,
			// 2) closing the connection.
			if ev&netpoll.OutEvents != 0 && !c.outboundBuffer.IsEmpty() {
			    // 写事件
				if err := el.loopWrite(c); err != nil {
					return err
				}
			}
			// If there is pending data in outbound buffer, then we should omit this readable event
			// and prioritize the writable events to achieve a higher performance.
			//
			// Note that the client may send massive amounts of data to server by write() under blocking mode,
			// resulting in that it won't receive any responses before the server read all data from client,
			// in which case if the socket send buffer is full, we need to let it go and continue reading the data
			// to prevent blocking forever.
			// 读事件
			if ev&netpoll.InEvents != 0 && (ev&netpoll.OutEvents == 0 || c.outboundBuffer.IsEmpty()) {
				return el.loopRead(c)
			}
			return nil
		}
		// 说明只是可以建立新的连接
		return el.loopAccept(ev)
	})
	el.getLogger().Debugf("event-loop(%d) is exiting due to error: %v", el.idx, err)
}

polling

这个方法是比较重要的,也是阻塞在epoll上面,去监听fd的事件

// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
	el := newEventList(InitPollEventsCap)
	var wakenUp bool

	msec := -1
	for {
	    // 使用epoll_wait
		n, err := unix.EpollWait(p.fd, el.events, msec)
		if n == 0 || (n < 0 && err == unix.EINTR) {
			msec = -1
			runtime.Gosched()
			continue
		} else if err != nil {
			logging.Errorf("error occurs in epoll: %v", os.NewSyscallError("epoll_wait", err))
			return err
		}
		msec = 0
        // 判断每个套接字的事件
		for i := 0; i < n; i++ {
			ev := &el.events[i]
            // 判断是不是唤醒的
			if fd := int(ev.Fd); fd != p.wfd {
				switch err = callback(fd, ev.Events); err {
				case nil:
				case errors.ErrAcceptSocket, errors.ErrServerShutdown:
					return err
				default:
					logging.Warnf("error occurs in event-loop: %v", err)
				}
			} else { // poller is awaken to run tasks in queues.
				wakenUp = true
				_, _ = unix.Read(p.wfd, p.wfdBuf)
			}
		}
        // 进行唤醒
		if wakenUp {
			wakenUp = false
			task := p.priorAsyncTaskQueue.Dequeue()
			// 运行任务
			for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() {
				switch err = task.Run(task.Arg); err {
				case nil:
				case errors.ErrServerShutdown:
					return err
				default:
					logging.Warnf("error occurs in user-defined function, %v", err)
				}
				// 放入任务
				queue.PutTask(task)
			}
			for i := 0; i < MaxAsyncTasksAtOneTime; i++ {
				if task = p.asyncTaskQueue.Dequeue(); task == nil {
					break
				}
				switch err = task.Run(task.Arg); err {
				case nil:
				case errors.ErrServerShutdown:
					return err
				default:
					logging.Warnf("error occurs in user-defined function, %v", err)
				}
				queue.PutTask(task)
			}
			atomic.StoreInt32(&p.netpollWakeSig, 0)
			if (!p.asyncTaskQueue.Empty() || !p.priorAsyncTaskQueue.Empty()) && atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
				for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
				}
			}
		}

		if n == el.size {
			el.expand()
		} else if n < el.size>>1 {
			el.shrink()
		}
	}
}

这里主要分析的是在reuse port的情况下,根据你开多少线程那么开多少个open poll,这样的话线程数量就是固定的,就不会出现goroutine暴增的情况,同时因为每次accept连接后,便会设置成了非阻塞的,并且不会阻塞在read和write这样的io事件上,通过这些行为保证了整个流程的高可用

加载全部内容

相关教程
猜你喜欢
用户评论