// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package rpc import ( "bufio" "encoding/gob" "errors" "io" "log" "net" "net/http" "sync" ) // ServerError represents an error that has been returned from // the remote side of the RPC connection. type ServerError string func (e ServerError) Error() string { return string(e) } var ErrShutdown = errors.New("connection is shut down") // Call represents an active RPC. type Call struct { ServiceMethod string // The name of the service and method to call. Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call // Strobes when call is complete. } // Client represents an RPC Client. // There may be multiple outstanding Calls associated // with a single Client, and a Client may be used by // multiple goroutines simultaneously. type Client struct { codec ClientCodec reqMutex sync.Mutex // protects following request Request mutex sync.Mutex // protects following seq uint64 pending map[uint64]*Call closing bool // user has called Close shutdown bool // server has told us to stop } // A ClientCodec implements writing of RPC requests and // reading of RPC responses for the client side of an RPC session. // The client calls WriteRequest to write a request to the connection // and calls ReadResponseHeader and ReadResponseBody in pairs // to read responses. The client calls Close when finished with the // connection. ReadResponseBody may be called with a nil // argument to force the body of the response to be read and then // discarded. type ClientCodec interface { // WriteRequest must be safe for concurrent use by multiple goroutines. WriteRequest(*Request, interface{}) error ReadResponseHeader(*Response) error ReadResponseBody(interface{}) error Close() error } func (client *Client) send(call *Call) { client.reqMutex.Lock() defer client.reqMutex.Unlock() // Register this call. client.mutex.Lock() if client.shutdown || client.closing { call.Error = ErrShutdown client.mutex.Unlock() call.done() return } seq := client.seq client.seq++ client.pending[seq] = call client.mutex.Unlock() // Encode and send the request. client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) if err != nil { client.mutex.Lock() call = client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() if call != nil { call.Error = err call.done() } } } func (client *Client) input() { var err error var response Response for err == nil { response = Response{} err = client.codec.ReadResponseHeader(&response) if err != nil { break } seq := response.Seq client.mutex.Lock() call := client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() switch { case call == nil: // We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } case response.Error != "": // We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default: err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } // Terminate pending calls. client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:", err) } } func (call *Call) done() { select { case call.Done <- call: // ok default: // We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go(). if debugLog { log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") } } } // NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. // It adds a buffer to the write side of the connection so // the header and payload are sent as a unit. func NewClient(conn io.ReadWriteCloser) *Client { encBuf := bufio.NewWriter(conn) client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} return NewClientWithCodec(client) } // NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. func NewClientWithCodec(codec ClientCodec) *Client { client := &Client{ codec: codec, pending: make(map[uint64]*Call), } go client.input() return client } type gobClientCodec struct { rwc io.ReadWriteCloser dec *gob.Decoder enc *gob.Encoder encBuf *bufio.Writer } func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { return } if err = c.enc.Encode(body); err != nil { return } return c.encBuf.Flush() } func (c *gobClientCodec) ReadResponseHeader(r *Response) error { return c.dec.Decode(r) } func (c *gobClientCodec) ReadResponseBody(body interface{}) error { return c.dec.Decode(body) } func (c *gobClientCodec) Close() error { return c.rwc.Close() } // DialHTTP connects to an HTTP RPC server at the specified network address // listening on the default HTTP RPC path. func DialHTTP(network, address string) (*Client, error) { return DialHTTPPath(network, address, DefaultRPCPath) } // DialHTTPPath connects to an HTTP RPC server // at the specified network address and path. func DialHTTPPath(network, address, path string) (*Client, error) { var err error conn, err := net.Dial(network, address) if err != nil { return nil, err } io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") // Require successful HTTP response // before switching to RPC protocol. resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) if err == nil && resp.Status == connected { return NewClient(conn), nil } if err == nil { err = errors.New("unexpected HTTP response: " + resp.Status) } conn.Close() return nil, &net.OpError{ Op: "dial-http", Net: network + " " + address, Addr: nil, Err: err, } } // Dial connects to an RPC server at the specified network address. func Dial(network, address string) (*Client, error) { conn, err := net.Dial(network, address) if err != nil { return nil, err } return NewClient(conn), nil } // Close calls the underlying codec's Close method. If the connection is already // shutting down, ErrShutdown is returned. func (client *Client) Close() error { client.mutex.Lock() if client.closing { client.mutex.Unlock() return ErrShutdown } client.closing = true client.mutex.Unlock() return client.codec.Close() } // Go invokes the function asynchronously. It returns the Call structure representing // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { call := new(Call) call.ServiceMethod = serviceMethod call.Args = args call.Reply = reply if done == nil { done = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap(done) == 0 { log.Panic("rpc: done channel is unbuffered") } } call.Done = done client.send(call) return call } // Call invokes the named function, waits for it to complete, and returns its error status. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done return call.Error }