diff --git a/router/router.go b/router/router.go index 73a4a54a3..17dd0e384 100644 --- a/router/router.go +++ b/router/router.go @@ -122,6 +122,12 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { sse := e.Group("/stream") { sse.GET("/events", server.EventStreamSSE) + sse.GET("/logs/:owner/:name/:build/:number", + session.SetRepo(), + session.SetPerm(), + session.MustPull, + server.LogStreamSSE, + ) } info := e.Group("/api/info") diff --git a/server/stream.go b/server/stream.go index 0d0761b0a..33bef477c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -206,6 +206,10 @@ func EventStream(c *gin.Context) { reader(ws) } +// +// event source streaming for compatibility with quic and http2 +// + func EventStreamSSE(c *gin.Context) { c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") @@ -275,3 +279,127 @@ func EventStreamSSE(c *gin.Context) { } } } + +func LogStreamSSE(c *gin.Context) { + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") + + rw := c.Writer + + flusher, ok := rw.(http.Flusher) + if !ok { + c.String(500, "Streaming not supported") + return + } + + // repo := session.Repo(c) + // + // // parse the build number and job sequence number from + // // the repquest parameter. + // num, _ := strconv.Atoi(c.Params.ByName("number")) + // ppid, _ := strconv.Atoi(c.Params.ByName("ppid")) + // name := c.Params.ByName("proc") + // + // build, err := store.GetBuildNumber(c, repo, num) + // if err != nil { + // c.AbortWithError(404, err) + // return + // } + // + // proc, err := store.FromContext(c).ProcChild(build, ppid, name) + // if err != nil { + // c.AbortWithError(404, err) + // return + // } + + repo := session.Repo(c) + buildn, _ := strconv.Atoi(c.Param("build")) + jobn, _ := strconv.Atoi(c.Param("number")) + + build, err := store.GetBuildNumber(c, repo, buildn) + if err != nil { + logrus.Debugln("stream cannot get build number.", err) + io.WriteString(rw, "event: error\ndata: build not found\n\n") + return + } + proc, err := store.FromContext(c).ProcFind(build, jobn) + if err != nil { + logrus.Debugln("stream cannot get proc number.", err) + io.WriteString(rw, "event: error\ndata: process not found\n\n") + return + } + if proc.State != model.StatusRunning { + logrus.Debugln("stream not found.") + io.WriteString(rw, "event: error\ndata: stream not found\n\n") + return + } + + logc := make(chan []byte, 10) + ctx, cancel := context.WithCancel( + context.Background(), + ) + + logrus.Debugf("log stream: connection opened") + + defer func() { + cancel() + close(logc) + logrus.Debugf("log stream: connection closed") + }() + + go func() { + // TODO remove global variable + Config.Services.Logs.Tail(ctx, fmt.Sprint(proc.ID), func(entries ...*logging.Entry) { + for _, entry := range entries { + select { + case <-ctx.Done(): + return + default: + logc <- entry.Data + } + } + }) + + io.WriteString(rw, "event: error\ndata: eof\n\n") + + cancel() + }() + + id := 1 + last, _ := strconv.Atoi( + c.Request.Header.Get("Last-Event-ID"), + ) + if last != 0 { + logrus.Debugf("log stream: reconnect: last-event-id: %d", last) + } + + // retry: 10000\n + + for { + select { + // after 1 hour of idle (no response) end the stream. + // this is more of a safety mechanism than anything, + // and can be removed once the code is more mature. + case <-time.After(time.Hour): + return + case <-rw.CloseNotify(): + return + case <-ctx.Done(): + return + case buf, ok := <-logc: + if ok { + if id > last { + io.WriteString(rw, "id: "+strconv.Itoa(id)) + io.WriteString(rw, "\n") + io.WriteString(rw, "data: ") + rw.Write(buf) + io.WriteString(rw, "\n\n") + flusher.Flush() + } + id++ + } + } + } +} diff --git a/server/template/files/index_polymer.html b/server/template/files/index_polymer.html index 05d4e6489..7793322e3 100644 --- a/server/template/files/index_polymer.html +++ b/server/template/files/index_polymer.html @@ -17,6 +17,15 @@ window.USER = {{ json .user }}; {{ end }} + diff --git a/server/template/template_gen.go b/server/template/template_gen.go index 33e00672f..daef8569e 100644 --- a/server/template/template_gen.go +++ b/server/template/template_gen.go @@ -123,6 +123,15 @@ var indexpolymer = ` window.USER = {{ json .user }}; {{ end }} + diff --git a/vendor/golang.org/x/crypto/acme/acme.go b/vendor/golang.org/x/crypto/acme/acme.go index a7b6ce4e9..4e409be6d 100644 --- a/vendor/golang.org/x/crypto/acme/acme.go +++ b/vendor/golang.org/x/crypto/acme/acme.go @@ -207,7 +207,7 @@ func (c *Client) CreateCert(ctx context.Context, csr []byte, exp time.Duration, return nil, "", responseError(res) } - curl := res.Header.Get("location") // cert permanent URL + curl := res.Header.Get("Location") // cert permanent URL if res.ContentLength == 0 { // no cert in the body; poll until we get it cert, err := c.FetchCert(ctx, curl, bundle) @@ -240,7 +240,7 @@ func (c *Client) FetchCert(ctx context.Context, url string, bundle bool) ([][]by if res.StatusCode > 299 { return nil, responseError(res) } - d := retryAfter(res.Header.Get("retry-after"), 3*time.Second) + d := retryAfter(res.Header.Get("Retry-After"), 3*time.Second) select { case <-time.After(d): // retry @@ -444,7 +444,7 @@ func (c *Client) WaitAuthorization(ctx context.Context, url string) (*Authorizat if err != nil { return nil, err } - retry := res.Header.Get("retry-after") + retry := res.Header.Get("Retry-After") if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted { res.Body.Close() if err := sleep(retry, 1); err != nil { @@ -703,7 +703,7 @@ func (c *Client) retryPostJWS(ctx context.Context, key crypto.Signer, url string // clear any nonces that we might've stored that might now be // considered bad c.clearNonces() - retry := res.Header.Get("retry-after") + retry := res.Header.Get("Retry-After") if err := sleep(retry, 1); err != nil { return nil, err } diff --git a/vendor/golang.org/x/crypto/acme/autocert/autocert.go b/vendor/golang.org/x/crypto/acme/autocert/autocert.go index 679ff4dc1..a478eff54 100644 --- a/vendor/golang.org/x/crypto/acme/autocert/autocert.go +++ b/vendor/golang.org/x/crypto/acme/autocert/autocert.go @@ -33,6 +33,12 @@ import ( "golang.org/x/crypto/acme" ) +// createCertRetryAfter is how much time to wait before removing a failed state +// entry due to an unsuccessful createCert call. +// This is a variable instead of a const for testing. +// TODO: Consider making it configurable or an exp backoff? +var createCertRetryAfter = time.Minute + // pseudoRand is safe for concurrent use. var pseudoRand *lockedMathRand @@ -170,6 +176,12 @@ func (m *Manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, if name == "" { return nil, errors.New("acme/autocert: missing server name") } + if !strings.Contains(strings.Trim(name, "."), ".") { + return nil, errors.New("acme/autocert: server name component count invalid") + } + if strings.ContainsAny(name, `/\`) { + return nil, errors.New("acme/autocert: server name contains invalid character") + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -363,6 +375,23 @@ func (m *Manager) createCert(ctx context.Context, domain string) (*tls.Certifica der, leaf, err := m.authorizedCert(ctx, state.key, domain) if err != nil { + // Remove the failed state after some time, + // making the manager call createCert again on the following TLS hello. + time.AfterFunc(createCertRetryAfter, func() { + defer testDidRemoveState(domain) + m.stateMu.Lock() + defer m.stateMu.Unlock() + // Verify the state hasn't changed and it's still invalid + // before deleting. + s, ok := m.state[domain] + if !ok { + return + } + if _, err := validCert(domain, s.cert, s.key); err == nil { + return + } + delete(m.state, domain) + }) return nil, err } state.cert = der @@ -411,7 +440,6 @@ func (m *Manager) certState(domain string) (*certState, error) { // authorizedCert starts domain ownership verification process and requests a new cert upon success. // The key argument is the certificate private key. func (m *Manager) authorizedCert(ctx context.Context, key crypto.Signer, domain string) (der [][]byte, leaf *x509.Certificate, err error) { - // TODO: make m.verify retry or retry m.verify calls here if err := m.verify(ctx, domain); err != nil { return nil, nil, err } @@ -782,5 +810,10 @@ func (r *lockedMathRand) int63n(max int64) int64 { return n } -// for easier testing -var timeNow = time.Now +// For easier testing. +var ( + timeNow = time.Now + + // Called when a state is removed. + testDidRemoveState = func(domain string) {} +) diff --git a/vendor/golang.org/x/crypto/acme/autocert/listener.go b/vendor/golang.org/x/crypto/acme/autocert/listener.go index d4c93d2f1..d744df0ed 100644 --- a/vendor/golang.org/x/crypto/acme/autocert/listener.go +++ b/vendor/golang.org/x/crypto/acme/autocert/listener.go @@ -36,6 +36,9 @@ import ( // operating system-specific cache or temp directory. This may not // be suitable for servers spanning multiple machines. // +// The returned listener uses a *tls.Config that enables HTTP/2, and +// should only be used with servers that support HTTP/2. +// // The returned Listener also enables TCP keep-alives on the accepted // connections. The returned *tls.Conn are returned before their TLS // handshake has completed. @@ -58,6 +61,9 @@ func NewListener(domains ...string) net.Listener { // Listener listens on the standard TLS port (443) on all interfaces // and returns a net.Listener returning *tls.Conn connections. // +// The returned listener uses a *tls.Config that enables HTTP/2, and +// should only be used with servers that support HTTP/2. +// // The returned Listener also enables TCP keep-alives on the accepted // connections. The returned *tls.Conn are returned before their TLS // handshake has completed. @@ -68,7 +74,8 @@ func (m *Manager) Listener() net.Listener { ln := &listener{ m: m, conf: &tls.Config{ - GetCertificate: m.GetCertificate, // bonus: panic on nil m + GetCertificate: m.GetCertificate, // bonus: panic on nil m + NextProtos: []string{"h2", "http/1.1"}, // Enable HTTP/2 }, } ln.tcpListener, ln.tcpListenErr = net.Listen("tcp", ":443") diff --git a/vendor/vendor.json b/vendor/vendor.json index 8bda8e4c9..bf64ab63a 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -698,13 +698,13 @@ "revisionTime": "2016-11-19T21:37:11Z" }, { - "checksumSHA1": "W0j4I7QpxXlChjyhAojZmFjU6Bg=", + "checksumSHA1": "Ag672Laei0E45NrvTO4LV9B3Jfc=", "path": "golang.org/x/crypto/acme", "revision": "c7af5bf2638a1164f2eb5467c39c6cffbd13a02e", "revisionTime": "2017-04-25T18:31:00Z" }, { - "checksumSHA1": "/FdEtM7DVyyunjtRcW1zX4Nvj8o=", + "checksumSHA1": "TrKJW+flz7JulXU3sqnBJjGzgQc=", "path": "golang.org/x/crypto/acme/autocert", "revision": "c7af5bf2638a1164f2eb5467c39c6cffbd13a02e", "revisionTime": "2017-04-25T18:31:00Z"