serve logs with sse; update acme pkg

This commit is contained in:
Brad Rydzewski 2017-07-24 15:57:07 -04:00
parent 07c82aa1c6
commit 441a015ca8
8 changed files with 202 additions and 10 deletions

View file

@ -122,6 +122,12 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
sse := e.Group("/stream")
{
sse.GET("/events", server.EventStreamSSE)
sse.GET("/logs/:owner/:name/:build/:number",
session.SetRepo(),
session.SetPerm(),
session.MustPull,
server.LogStreamSSE,
)
}
info := e.Group("/api/info")

View file

@ -206,6 +206,10 @@ func EventStream(c *gin.Context) {
reader(ws)
}
//
// event source streaming for compatibility with quic and http2
//
func EventStreamSSE(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
@ -275,3 +279,127 @@ func EventStreamSSE(c *gin.Context) {
}
}
}
func LogStreamSSE(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")
rw := c.Writer
flusher, ok := rw.(http.Flusher)
if !ok {
c.String(500, "Streaming not supported")
return
}
// repo := session.Repo(c)
//
// // parse the build number and job sequence number from
// // the repquest parameter.
// num, _ := strconv.Atoi(c.Params.ByName("number"))
// ppid, _ := strconv.Atoi(c.Params.ByName("ppid"))
// name := c.Params.ByName("proc")
//
// build, err := store.GetBuildNumber(c, repo, num)
// if err != nil {
// c.AbortWithError(404, err)
// return
// }
//
// proc, err := store.FromContext(c).ProcChild(build, ppid, name)
// if err != nil {
// c.AbortWithError(404, err)
// return
// }
repo := session.Repo(c)
buildn, _ := strconv.Atoi(c.Param("build"))
jobn, _ := strconv.Atoi(c.Param("number"))
build, err := store.GetBuildNumber(c, repo, buildn)
if err != nil {
logrus.Debugln("stream cannot get build number.", err)
io.WriteString(rw, "event: error\ndata: build not found\n\n")
return
}
proc, err := store.FromContext(c).ProcFind(build, jobn)
if err != nil {
logrus.Debugln("stream cannot get proc number.", err)
io.WriteString(rw, "event: error\ndata: process not found\n\n")
return
}
if proc.State != model.StatusRunning {
logrus.Debugln("stream not found.")
io.WriteString(rw, "event: error\ndata: stream not found\n\n")
return
}
logc := make(chan []byte, 10)
ctx, cancel := context.WithCancel(
context.Background(),
)
logrus.Debugf("log stream: connection opened")
defer func() {
cancel()
close(logc)
logrus.Debugf("log stream: connection closed")
}()
go func() {
// TODO remove global variable
Config.Services.Logs.Tail(ctx, fmt.Sprint(proc.ID), func(entries ...*logging.Entry) {
for _, entry := range entries {
select {
case <-ctx.Done():
return
default:
logc <- entry.Data
}
}
})
io.WriteString(rw, "event: error\ndata: eof\n\n")
cancel()
}()
id := 1
last, _ := strconv.Atoi(
c.Request.Header.Get("Last-Event-ID"),
)
if last != 0 {
logrus.Debugf("log stream: reconnect: last-event-id: %d", last)
}
// retry: 10000\n
for {
select {
// after 1 hour of idle (no response) end the stream.
// this is more of a safety mechanism than anything,
// and can be removed once the code is more mature.
case <-time.After(time.Hour):
return
case <-rw.CloseNotify():
return
case <-ctx.Done():
return
case buf, ok := <-logc:
if ok {
if id > last {
io.WriteString(rw, "id: "+strconv.Itoa(id))
io.WriteString(rw, "\n")
io.WriteString(rw, "data: ")
rw.Write(buf)
io.WriteString(rw, "\n\n")
flusher.Flush()
}
id++
}
}
}
}

View file

@ -17,6 +17,15 @@
window.USER = {{ json .user }};
{{ end }}
</script>
<script>
// TODO load a polyfill for SSE for the Edge browser. Consider downloading
// with bower instead of loading from cndjs.
if (!window.EventSource) {
var ssePolyfill = document.createElement("script");
ssePolyfill.src = "https://cdnjs.cloudflare.com/ajax/libs/event-source-polyfill/0.0.9/eventsource.min.js";
document.body.appendChild(ssePolyfill);
}
</script>
<script src="/bower_components/webcomponentsjs/webcomponents-loader.js"></script>
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto">

View file

@ -123,6 +123,15 @@ var indexpolymer = `<!DOCTYPE html>
window.USER = {{ json .user }};
{{ end }}
</script>
<script>
// TODO load a polyfill for SSE for the Edge browser. Consider downloading
// with bower instead of loading from cndjs.
if (!window.EventSource) {
var ssePolyfill = document.createElement("script");
ssePolyfill.src = "https://cdnjs.cloudflare.com/ajax/libs/event-source-polyfill/0.0.9/eventsource.min.js";
document.body.appendChild(ssePolyfill);
}
</script>
<script src="/bower_components/webcomponentsjs/webcomponents-loader.js"></script>
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto">

View file

@ -207,7 +207,7 @@ func (c *Client) CreateCert(ctx context.Context, csr []byte, exp time.Duration,
return nil, "", responseError(res)
}
curl := res.Header.Get("location") // cert permanent URL
curl := res.Header.Get("Location") // cert permanent URL
if res.ContentLength == 0 {
// no cert in the body; poll until we get it
cert, err := c.FetchCert(ctx, curl, bundle)
@ -240,7 +240,7 @@ func (c *Client) FetchCert(ctx context.Context, url string, bundle bool) ([][]by
if res.StatusCode > 299 {
return nil, responseError(res)
}
d := retryAfter(res.Header.Get("retry-after"), 3*time.Second)
d := retryAfter(res.Header.Get("Retry-After"), 3*time.Second)
select {
case <-time.After(d):
// retry
@ -444,7 +444,7 @@ func (c *Client) WaitAuthorization(ctx context.Context, url string) (*Authorizat
if err != nil {
return nil, err
}
retry := res.Header.Get("retry-after")
retry := res.Header.Get("Retry-After")
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted {
res.Body.Close()
if err := sleep(retry, 1); err != nil {
@ -703,7 +703,7 @@ func (c *Client) retryPostJWS(ctx context.Context, key crypto.Signer, url string
// clear any nonces that we might've stored that might now be
// considered bad
c.clearNonces()
retry := res.Header.Get("retry-after")
retry := res.Header.Get("Retry-After")
if err := sleep(retry, 1); err != nil {
return nil, err
}

View file

@ -33,6 +33,12 @@ import (
"golang.org/x/crypto/acme"
)
// createCertRetryAfter is how much time to wait before removing a failed state
// entry due to an unsuccessful createCert call.
// This is a variable instead of a const for testing.
// TODO: Consider making it configurable or an exp backoff?
var createCertRetryAfter = time.Minute
// pseudoRand is safe for concurrent use.
var pseudoRand *lockedMathRand
@ -170,6 +176,12 @@ func (m *Manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate,
if name == "" {
return nil, errors.New("acme/autocert: missing server name")
}
if !strings.Contains(strings.Trim(name, "."), ".") {
return nil, errors.New("acme/autocert: server name component count invalid")
}
if strings.ContainsAny(name, `/\`) {
return nil, errors.New("acme/autocert: server name contains invalid character")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
@ -363,6 +375,23 @@ func (m *Manager) createCert(ctx context.Context, domain string) (*tls.Certifica
der, leaf, err := m.authorizedCert(ctx, state.key, domain)
if err != nil {
// Remove the failed state after some time,
// making the manager call createCert again on the following TLS hello.
time.AfterFunc(createCertRetryAfter, func() {
defer testDidRemoveState(domain)
m.stateMu.Lock()
defer m.stateMu.Unlock()
// Verify the state hasn't changed and it's still invalid
// before deleting.
s, ok := m.state[domain]
if !ok {
return
}
if _, err := validCert(domain, s.cert, s.key); err == nil {
return
}
delete(m.state, domain)
})
return nil, err
}
state.cert = der
@ -411,7 +440,6 @@ func (m *Manager) certState(domain string) (*certState, error) {
// authorizedCert starts domain ownership verification process and requests a new cert upon success.
// The key argument is the certificate private key.
func (m *Manager) authorizedCert(ctx context.Context, key crypto.Signer, domain string) (der [][]byte, leaf *x509.Certificate, err error) {
// TODO: make m.verify retry or retry m.verify calls here
if err := m.verify(ctx, domain); err != nil {
return nil, nil, err
}
@ -782,5 +810,10 @@ func (r *lockedMathRand) int63n(max int64) int64 {
return n
}
// for easier testing
var timeNow = time.Now
// For easier testing.
var (
timeNow = time.Now
// Called when a state is removed.
testDidRemoveState = func(domain string) {}
)

View file

@ -36,6 +36,9 @@ import (
// operating system-specific cache or temp directory. This may not
// be suitable for servers spanning multiple machines.
//
// The returned listener uses a *tls.Config that enables HTTP/2, and
// should only be used with servers that support HTTP/2.
//
// The returned Listener also enables TCP keep-alives on the accepted
// connections. The returned *tls.Conn are returned before their TLS
// handshake has completed.
@ -58,6 +61,9 @@ func NewListener(domains ...string) net.Listener {
// Listener listens on the standard TLS port (443) on all interfaces
// and returns a net.Listener returning *tls.Conn connections.
//
// The returned listener uses a *tls.Config that enables HTTP/2, and
// should only be used with servers that support HTTP/2.
//
// The returned Listener also enables TCP keep-alives on the accepted
// connections. The returned *tls.Conn are returned before their TLS
// handshake has completed.
@ -68,7 +74,8 @@ func (m *Manager) Listener() net.Listener {
ln := &listener{
m: m,
conf: &tls.Config{
GetCertificate: m.GetCertificate, // bonus: panic on nil m
GetCertificate: m.GetCertificate, // bonus: panic on nil m
NextProtos: []string{"h2", "http/1.1"}, // Enable HTTP/2
},
}
ln.tcpListener, ln.tcpListenErr = net.Listen("tcp", ":443")

4
vendor/vendor.json vendored
View file

@ -698,13 +698,13 @@
"revisionTime": "2016-11-19T21:37:11Z"
},
{
"checksumSHA1": "W0j4I7QpxXlChjyhAojZmFjU6Bg=",
"checksumSHA1": "Ag672Laei0E45NrvTO4LV9B3Jfc=",
"path": "golang.org/x/crypto/acme",
"revision": "c7af5bf2638a1164f2eb5467c39c6cffbd13a02e",
"revisionTime": "2017-04-25T18:31:00Z"
},
{
"checksumSHA1": "/FdEtM7DVyyunjtRcW1zX4Nvj8o=",
"checksumSHA1": "TrKJW+flz7JulXU3sqnBJjGzgQc=",
"path": "golang.org/x/crypto/acme/autocert",
"revision": "c7af5bf2638a1164f2eb5467c39c6cffbd13a02e",
"revisionTime": "2017-04-25T18:31:00Z"