check and parallel upload

This commit is contained in:
celogeek 2022-03-05 18:19:19 +01:00
parent 65355f0ea0
commit 76e71ca6ae
Signed by: celogeek
GPG Key ID: E6B7BDCFC446233A
4 changed files with 137 additions and 34 deletions

View File

@ -8,25 +8,23 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/schollz/progressbar/v3" "github.com/schollz/progressbar/v3"
) )
type UploadCommand struct { type UploadCommand struct {
Url string `short:"u" long:"url" description:"Url of the instance" required:"true"` Url string `short:"u" long:"url" description:"Url of the instance" required:"true"`
Token string `short:"t" long:"token" description:"Token of the instance" required:"true"` Token string `short:"t" long:"token" description:"Token of the instance" required:"true"`
File string `short:"f" long:"file" description:"File to upload" required:"true"` File string `short:"f" long:"file" description:"File to upload" required:"true"`
Workers uint32 `short:"w" long:"workers" description:"Number of workers for uploading chunks" default:"4"`
} }
type UploadError struct { type UploadError struct {
Error string Error string
} }
type UploadChunkSuccess struct {
Checksum string
}
type UploadFileRequest struct { type UploadFileRequest struct {
Name string Name string
Checksum string Checksum string
@ -39,7 +37,45 @@ type UploadFileResponse struct {
Size uint64 Size uint64
} }
func (c *UploadCommand) Execute(args []string) error { func (c *UploadCommand) Cli() *resty.Client {
return resty.New().SetBaseURL(c.Url).SetAuthScheme("Private").SetAuthToken(c.Token)
}
func (c *UploadCommand) FileExists() (string, error) {
f, err := os.Open(c.File)
if err != nil {
return "", err
}
defer f.Close()
st, err := f.Stat()
if err != nil {
return "", err
}
chunkSize := int64(1 << 20)
nbChunks := st.Size() / chunkSize
if st.Size()%chunkSize > 0 {
nbChunks++
}
progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Checking %s", filepath.Base(c.File)))
defer progress.Close()
tee := io.TeeReader(f, progress)
checksum := sha1.New()
io.Copy(checksum, tee)
sum := hex.EncodeToString(checksum.Sum(nil))
resp, err := c.Cli().R().Head(fmt.Sprintf("/file/%s", sum))
if err != nil {
return "", err
}
if resp.IsSuccess() {
return "", errors.New("file already exists")
}
return sum, nil
}
func (c *UploadCommand) FileUpload(sum string) error {
f, err := os.Open(c.File) f, err := os.Open(c.File)
if err != nil { if err != nil {
return err return err
@ -56,40 +92,78 @@ func (c *UploadCommand) Execute(args []string) error {
nbChunks++ nbChunks++
} }
uploadFile := &UploadFileRequest{Name: filepath.Base(c.File), Chunks: make([]string, nbChunks)} uploadFile := &UploadFileRequest{
Name: filepath.Base(c.File),
Chunks: make([]string, nbChunks),
Checksum: sum,
}
cli := c.Cli()
progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Uploading %s", uploadFile.Name)) progress := progressbar.DefaultBytes(st.Size(), fmt.Sprintf("Uploading %s", uploadFile.Name))
defer progress.Close() defer progress.Close()
cli := resty.New().SetBaseURL(c.Url).SetAuthScheme("Private").SetAuthToken(c.Token) wg := sync.WaitGroup{}
b := make([]byte, chunkSize) mu := sync.Mutex{}
checksum := sha1.New() wg.Add(4)
for i := 0; ; i++ { wgErrors := make([]error, c.Workers)
n, err := f.Read(b) i := int64(0)
if n == 0 { for w := uint32(0); w < c.Workers; w++ {
if err == io.EOF { go func(w uint32) {
break defer wg.Done()
} b := make([]byte, chunkSize)
return err for {
} mu.Lock()
part := i
i++
n, err := f.Read(b)
mu.Unlock()
if n == 0 {
if err == io.EOF {
break
}
wgErrors[w] = err
return
}
resp, err := cli.R().SetError(&UploadError{}).SetResult(&UploadChunkSuccess{}).SetBody(b[0:n]).Post("/file/chunk") checksum := sha1.New()
checksum.Write(b[0:n])
sum := hex.EncodeToString(checksum.Sum(nil))
resp, err := cli.R().Head(fmt.Sprintf("/file/chunk/%s", sum))
if err != nil {
wgErrors[w] = err
return
}
if resp.IsSuccess() {
uploadFile.Chunks[part] = sum
progress.Add(n)
continue
}
resp, err = cli.R().SetError(&UploadError{}).SetBody(b[0:n]).Post("/file/chunk")
if err != nil {
wgErrors[w] = err
return
}
if err, ok := resp.Error().(*UploadError); ok {
wgErrors[w] = errors.New(err.Error)
return
}
uploadFile.Chunks[part] = sum
progress.Add(n)
}
}(w)
}
wg.Wait()
for _, err := range wgErrors {
if err != nil { if err != nil {
return err return err
} }
if err, ok := resp.Error().(*UploadError); ok {
return errors.New(err.Error)
}
if result, ok := resp.Result().(*UploadChunkSuccess); ok {
uploadFile.Chunks[i] = result.Checksum
checksum.Write(b[0:n])
progress.Add(n)
}
} }
uploadFile.Checksum = hex.EncodeToString(checksum.Sum(nil))
resp, err := cli.R().SetBody(uploadFile).SetError(&UploadError{}).SetResult(&UploadFileResponse{}).Post("/file") resp, err := cli.R().SetBody(uploadFile).SetError(&UploadError{}).SetResult(&UploadFileResponse{}).Post("/file")
if err != nil { if err != nil {
return err return err
@ -107,6 +181,14 @@ func (c *UploadCommand) Execute(args []string) error {
return nil return nil
} }
func (c *UploadCommand) Execute(args []string) error {
sum, err := c.FileExists()
if err != nil {
return err
}
return c.FileUpload(sum)
}
func init() { func init() {
parser.AddCommand("upload", "Upload a file", "", &UploadCommand{}) parser.AddCommand("upload", "Upload a file", "", &UploadCommand{})
} }

View File

@ -144,6 +144,20 @@ func (s *Service) FileCreateChunk(c *gin.Context) {
}) })
} }
func (s *Service) FileChunkExists(c *gin.Context) {
checksum := c.Param("checksum")
if len(checksum) != 40 {
s.Error(c, http.StatusBadRequest, photoserrors.ErrStoreBadChecksum)
return
}
if s.Store.ChunkExists(checksum) {
c.Status(http.StatusOK)
} else {
c.Status(http.StatusNotFound)
}
}
func (s *Service) FileExists(c *gin.Context) { func (s *Service) FileExists(c *gin.Context) {
checksum := c.Param("checksum") checksum := c.Param("checksum")
if len(checksum) != 40 { if len(checksum) != 40 {

View File

@ -60,9 +60,11 @@ func (s *Service) SetupRoutes() {
album := s.Gin.Group("/file") album := s.Gin.Group("/file")
album.Use(s.RequireSession) album.Use(s.RequireSession)
album.POST("", s.FileCreate) album.POST("", s.FileCreate)
album.POST("/chunk", s.FileCreateChunk)
album.HEAD("/:checksum", s.FileExists) album.HEAD("/:checksum", s.FileExists)
album.POST("/chunk", s.FileCreateChunk)
album.HEAD("/chunk/:checksum", s.FileChunkExists)
s.Gin.NoRoute(func(c *gin.Context) { s.Gin.NoRoute(func(c *gin.Context) {
s.Error(c, http.StatusNotFound, photoserrors.ErrReqNotFound) s.Error(c, http.StatusNotFound, photoserrors.ErrReqNotFound)
}) })

View File

@ -46,6 +46,11 @@ func (s *Store) LoadChunk(sum string) (*Chunk, error) {
return c, nil return c, nil
} }
func (s *Store) ChunkExists(sum string) bool {
c := &Chunk{s, sum, nil}
return c.FileExists()
}
func (c *Chunk) Dir() string { func (c *Chunk) Dir() string {
return filepath.Join(c.Path, "storage", c.Sum[0:1], c.Sum[1:2]) return filepath.Join(c.Path, "storage", c.Sum[0:1], c.Sum[1:2])
} }