Source file src/net/rpc/server.go
1
2
3
4
5
127 package rpc
128
129 import (
130 "bufio"
131 "encoding/gob"
132 "errors"
133 "io"
134 "log"
135 "net"
136 "net/http"
137 "reflect"
138 "strings"
139 "sync"
140 "unicode"
141 "unicode/utf8"
142 )
143
144 const (
145
146 DefaultRPCPath = "/_goRPC_"
147 DefaultDebugPath = "/debug/rpc"
148 )
149
150
151
152 var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
153
154 type methodType struct {
155 sync.Mutex
156 method reflect.Method
157 ArgType reflect.Type
158 ReplyType reflect.Type
159 numCalls uint
160 }
161
162 type service struct {
163 name string
164 rcvr reflect.Value
165 typ reflect.Type
166 method map[string]*methodType
167 }
168
169
170
171
172 type Request struct {
173 ServiceMethod string
174 Seq uint64
175 next *Request
176 }
177
178
179
180
181 type Response struct {
182 ServiceMethod string
183 Seq uint64
184 Error string
185 next *Response
186 }
187
188
189 type Server struct {
190 mu sync.RWMutex
191 serviceMap map[string]*service
192 reqLock sync.Mutex
193 freeReq *Request
194 respLock sync.Mutex
195 freeResp *Response
196 }
197
198
199 func NewServer() *Server {
200 return &Server{serviceMap: make(map[string]*service)}
201 }
202
203
204 var DefaultServer = NewServer()
205
206
207 func isExported(name string) bool {
208 rune, _ := utf8.DecodeRuneInString(name)
209 return unicode.IsUpper(rune)
210 }
211
212
213 func isExportedOrBuiltinType(t reflect.Type) bool {
214 for t.Kind() == reflect.Ptr {
215 t = t.Elem()
216 }
217
218
219 return isExported(t.Name()) || t.PkgPath() == ""
220 }
221
222
223
224
225
226
227
228
229
230
231
232 func (server *Server) Register(rcvr interface{}) error {
233 return server.register(rcvr, "", false)
234 }
235
236
237
238 func (server *Server) RegisterName(name string, rcvr interface{}) error {
239 return server.register(rcvr, name, true)
240 }
241
242 func (server *Server) register(rcvr interface{}, name string, useName bool) error {
243 server.mu.Lock()
244 defer server.mu.Unlock()
245 if server.serviceMap == nil {
246 server.serviceMap = make(map[string]*service)
247 }
248 s := new(service)
249 s.typ = reflect.TypeOf(rcvr)
250 s.rcvr = reflect.ValueOf(rcvr)
251 sname := reflect.Indirect(s.rcvr).Type().Name()
252 if useName {
253 sname = name
254 }
255 if sname == "" {
256 s := "rpc.Register: no service name for type " + s.typ.String()
257 log.Print(s)
258 return errors.New(s)
259 }
260 if !isExported(sname) && !useName {
261 s := "rpc.Register: type " + sname + " is not exported"
262 log.Print(s)
263 return errors.New(s)
264 }
265 if _, present := server.serviceMap[sname]; present {
266 return errors.New("rpc: service already defined: " + sname)
267 }
268 s.name = sname
269
270
271 s.method = suitableMethods(s.typ, true)
272
273 if len(s.method) == 0 {
274 str := ""
275
276
277 method := suitableMethods(reflect.PtrTo(s.typ), false)
278 if len(method) != 0 {
279 str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
280 } else {
281 str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
282 }
283 log.Print(str)
284 return errors.New(str)
285 }
286 server.serviceMap[s.name] = s
287 return nil
288 }
289
290
291
292 func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
293 methods := make(map[string]*methodType)
294 for m := 0; m < typ.NumMethod(); m++ {
295 method := typ.Method(m)
296 mtype := method.Type
297 mname := method.Name
298
299 if method.PkgPath != "" {
300 continue
301 }
302
303 if mtype.NumIn() != 3 {
304 if reportErr {
305 log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
306 }
307 continue
308 }
309
310 argType := mtype.In(1)
311 if !isExportedOrBuiltinType(argType) {
312 if reportErr {
313 log.Println(mname, "argument type not exported:", argType)
314 }
315 continue
316 }
317
318 replyType := mtype.In(2)
319 if replyType.Kind() != reflect.Ptr {
320 if reportErr {
321 log.Println("method", mname, "reply type not a pointer:", replyType)
322 }
323 continue
324 }
325
326 if !isExportedOrBuiltinType(replyType) {
327 if reportErr {
328 log.Println("method", mname, "reply type not exported:", replyType)
329 }
330 continue
331 }
332
333 if mtype.NumOut() != 1 {
334 if reportErr {
335 log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
336 }
337 continue
338 }
339
340 if returnType := mtype.Out(0); returnType != typeOfError {
341 if reportErr {
342 log.Println("method", mname, "returns", returnType.String(), "not error")
343 }
344 continue
345 }
346 methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
347 }
348 return methods
349 }
350
351
352
353
354 var invalidRequest = struct{}{}
355
356 func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
357 resp := server.getResponse()
358
359 resp.ServiceMethod = req.ServiceMethod
360 if errmsg != "" {
361 resp.Error = errmsg
362 reply = invalidRequest
363 }
364 resp.Seq = req.Seq
365 sending.Lock()
366 err := codec.WriteResponse(resp, reply)
367 if debugLog && err != nil {
368 log.Println("rpc: writing response:", err)
369 }
370 sending.Unlock()
371 server.freeResponse(resp)
372 }
373
374 func (m *methodType) NumCalls() (n uint) {
375 m.Lock()
376 n = m.numCalls
377 m.Unlock()
378 return n
379 }
380
381 func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
382 mtype.Lock()
383 mtype.numCalls++
384 mtype.Unlock()
385 function := mtype.method.Func
386
387 returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
388
389 errInter := returnValues[0].Interface()
390 errmsg := ""
391 if errInter != nil {
392 errmsg = errInter.(error).Error()
393 }
394 server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
395 server.freeRequest(req)
396 }
397
398 type gobServerCodec struct {
399 rwc io.ReadWriteCloser
400 dec *gob.Decoder
401 enc *gob.Encoder
402 encBuf *bufio.Writer
403 closed bool
404 }
405
406 func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
407 return c.dec.Decode(r)
408 }
409
410 func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
411 return c.dec.Decode(body)
412 }
413
414 func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
415 if err = c.enc.Encode(r); err != nil {
416 if c.encBuf.Flush() == nil {
417
418
419 log.Println("rpc: gob error encoding response:", err)
420 c.Close()
421 }
422 return
423 }
424 if err = c.enc.Encode(body); err != nil {
425 if c.encBuf.Flush() == nil {
426
427
428 log.Println("rpc: gob error encoding body:", err)
429 c.Close()
430 }
431 return
432 }
433 return c.encBuf.Flush()
434 }
435
436 func (c *gobServerCodec) Close() error {
437 if c.closed {
438
439 return nil
440 }
441 c.closed = true
442 return c.rwc.Close()
443 }
444
445
446
447
448
449
450 func (server *Server) ServeConn(conn io.ReadWriteCloser) {
451 buf := bufio.NewWriter(conn)
452 srv := &gobServerCodec{
453 rwc: conn,
454 dec: gob.NewDecoder(conn),
455 enc: gob.NewEncoder(buf),
456 encBuf: buf,
457 }
458 server.ServeCodec(srv)
459 }
460
461
462
463 func (server *Server) ServeCodec(codec ServerCodec) {
464 sending := new(sync.Mutex)
465 for {
466 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
467 if err != nil {
468 if debugLog && err != io.EOF {
469 log.Println("rpc:", err)
470 }
471 if !keepReading {
472 break
473 }
474
475 if req != nil {
476 server.sendResponse(sending, req, invalidRequest, codec, err.Error())
477 server.freeRequest(req)
478 }
479 continue
480 }
481 go service.call(server, sending, mtype, req, argv, replyv, codec)
482 }
483 codec.Close()
484 }
485
486
487
488 func (server *Server) ServeRequest(codec ServerCodec) error {
489 sending := new(sync.Mutex)
490 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
491 if err != nil {
492 if !keepReading {
493 return err
494 }
495
496 if req != nil {
497 server.sendResponse(sending, req, invalidRequest, codec, err.Error())
498 server.freeRequest(req)
499 }
500 return err
501 }
502 service.call(server, sending, mtype, req, argv, replyv, codec)
503 return nil
504 }
505
506 func (server *Server) getRequest() *Request {
507 server.reqLock.Lock()
508 req := server.freeReq
509 if req == nil {
510 req = new(Request)
511 } else {
512 server.freeReq = req.next
513 *req = Request{}
514 }
515 server.reqLock.Unlock()
516 return req
517 }
518
519 func (server *Server) freeRequest(req *Request) {
520 server.reqLock.Lock()
521 req.next = server.freeReq
522 server.freeReq = req
523 server.reqLock.Unlock()
524 }
525
526 func (server *Server) getResponse() *Response {
527 server.respLock.Lock()
528 resp := server.freeResp
529 if resp == nil {
530 resp = new(Response)
531 } else {
532 server.freeResp = resp.next
533 *resp = Response{}
534 }
535 server.respLock.Unlock()
536 return resp
537 }
538
539 func (server *Server) freeResponse(resp *Response) {
540 server.respLock.Lock()
541 resp.next = server.freeResp
542 server.freeResp = resp
543 server.respLock.Unlock()
544 }
545
546 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
547 service, mtype, req, keepReading, err = server.readRequestHeader(codec)
548 if err != nil {
549 if !keepReading {
550 return
551 }
552
553 codec.ReadRequestBody(nil)
554 return
555 }
556
557
558 argIsValue := false
559 if mtype.ArgType.Kind() == reflect.Ptr {
560 argv = reflect.New(mtype.ArgType.Elem())
561 } else {
562 argv = reflect.New(mtype.ArgType)
563 argIsValue = true
564 }
565
566 if err = codec.ReadRequestBody(argv.Interface()); err != nil {
567 return
568 }
569 if argIsValue {
570 argv = argv.Elem()
571 }
572
573 replyv = reflect.New(mtype.ReplyType.Elem())
574 return
575 }
576
577 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
578
579 req = server.getRequest()
580 err = codec.ReadRequestHeader(req)
581 if err != nil {
582 req = nil
583 if err == io.EOF || err == io.ErrUnexpectedEOF {
584 return
585 }
586 err = errors.New("rpc: server cannot decode request: " + err.Error())
587 return
588 }
589
590
591
592 keepReading = true
593
594 dot := strings.LastIndex(req.ServiceMethod, ".")
595 if dot < 0 {
596 err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
597 return
598 }
599 serviceName := req.ServiceMethod[:dot]
600 methodName := req.ServiceMethod[dot+1:]
601
602
603 server.mu.RLock()
604 service = server.serviceMap[serviceName]
605 server.mu.RUnlock()
606 if service == nil {
607 err = errors.New("rpc: can't find service " + req.ServiceMethod)
608 return
609 }
610 mtype = service.method[methodName]
611 if mtype == nil {
612 err = errors.New("rpc: can't find method " + req.ServiceMethod)
613 }
614 return
615 }
616
617
618
619
620
621 func (server *Server) Accept(lis net.Listener) {
622 for {
623 conn, err := lis.Accept()
624 if err != nil {
625 log.Print("rpc.Serve: accept:", err.Error())
626 return
627 }
628 go server.ServeConn(conn)
629 }
630 }
631
632
633 func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
634
635
636
637 func RegisterName(name string, rcvr interface{}) error {
638 return DefaultServer.RegisterName(name, rcvr)
639 }
640
641
642
643
644
645
646
647
648 type ServerCodec interface {
649 ReadRequestHeader(*Request) error
650 ReadRequestBody(interface{}) error
651
652 WriteResponse(*Response, interface{}) error
653
654 Close() error
655 }
656
657
658
659
660
661
662 func ServeConn(conn io.ReadWriteCloser) {
663 DefaultServer.ServeConn(conn)
664 }
665
666
667
668 func ServeCodec(codec ServerCodec) {
669 DefaultServer.ServeCodec(codec)
670 }
671
672
673
674 func ServeRequest(codec ServerCodec) error {
675 return DefaultServer.ServeRequest(codec)
676 }
677
678
679
680
681 func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
682
683
684 var connected = "200 Connected to Go RPC"
685
686
687 func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
688 if req.Method != "CONNECT" {
689 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
690 w.WriteHeader(http.StatusMethodNotAllowed)
691 io.WriteString(w, "405 must CONNECT\n")
692 return
693 }
694 conn, _, err := w.(http.Hijacker).Hijack()
695 if err != nil {
696 log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
697 return
698 }
699 io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
700 server.ServeConn(conn)
701 }
702
703
704
705
706 func (server *Server) HandleHTTP(rpcPath, debugPath string) {
707 http.Handle(rpcPath, server)
708 http.Handle(debugPath, debugHTTP{server})
709 }
710
711
712
713
714 func HandleHTTP() {
715 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
716 }
717
View as plain text