use chunks list and store

This commit is contained in:
celogeek 2022-03-01 18:20:54 +01:00
parent fbc1c37f9b
commit 7557e74862
Signed by: celogeek
GPG Key ID: E6B7BDCFC446233A
5 changed files with 178 additions and 134 deletions

View File

@ -23,7 +23,9 @@ var (
) )
func main() { func main() {
config := &api.ServiceConfig{DB: mysql.NewConfig()} config := &api.ServiceConfig{
DB: mysql.NewConfig(),
}
flag.StringVar(&config.Listen, "listen", listen, "Listen address") flag.StringVar(&config.Listen, "listen", listen, "Listen address")
flag.StringVar(&config.DB.Addr, "mysql-addr", mysqlAddr, "Mysql addr") flag.StringVar(&config.DB.Addr, "mysql-addr", mysqlAddr, "Mysql addr")

View File

@ -30,8 +30,7 @@ var (
ErrStorePathNotADirectory = errors.New("store path is not a directory") ErrStorePathNotADirectory = errors.New("store path is not a directory")
ErrStoreBadChecksum = errors.New("checksum should be sha1 in hex format") ErrStoreBadChecksum = errors.New("checksum should be sha1 in hex format")
ErrStoreBadChunkSize = errors.New("part file size should be 1MB max") ErrStoreBadChunkSize = errors.New("part file size should be 1MB max")
ErrStoreWrongPartChecksum = errors.New("part file wrong checksum") ErrStoreMissingChunks = errors.New("part checksum missing")
ErrStoreBadPartNumber = errors.New("part should be a positive number")
ErrStoreMismatchChecksum = errors.New("part files doesn't match the original checksum") ErrStoreMismatchChecksum = errors.New("part files doesn't match the original checksum")
) )

View File

@ -2,140 +2,74 @@ package api
import ( import (
"bytes" "bytes"
"crypto/sha1"
"encoding/hex"
"fmt" "fmt"
"io" "io"
"io/fs"
"net/http" "net/http"
"os"
"path/filepath"
"sort"
"strconv"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var CHUNK_SIZE int64 = 1 << 20 var CHUNK_SIZE int64 = 1 << 20
func (s *Service) PrepareStore() { type File struct {
d, err := os.Stat(s.Config.StorePath) Sum string `json:"sum"`
if err != nil { Chunks []string `json:"chunks"`
s.LogErr.Fatal("Store", err)
}
if !d.IsDir() {
s.LogErr.Fatal("Store", ErrStorePathNotADirectory)
}
}
func (s *Service) StoreDir(checksum string) (string, error) {
dir := filepath.Join(s.Config.StorePath, "original", checksum[0:1], checksum[1:2], checksum)
err := os.MkdirAll(dir, 0755)
return dir, err
}
func (s *Service) TempDir(checksum string) (string, error) {
dir := filepath.Join(s.Config.StorePath, "tmp", checksum)
err := os.MkdirAll(dir, 0755)
return dir, err
}
type FileChunks struct {
N uint64
Path fs.FS
Name string
}
func (s *Service) FileChunks(checksum string) ([]FileChunks, error) {
base := filepath.Join(s.Config.StorePath, "tmp", checksum)
baseDir := os.DirFS(base)
dir, err := os.Open(base)
if err != nil {
return nil, err
}
defer dir.Close()
files, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}
parts := []FileChunks{}
for _, f := range files {
n, err := strconv.ParseUint(f, 10, 64)
if err != nil {
continue
}
parts = append(parts, FileChunks{n, baseDir, f})
}
sort.Slice(parts, func(i, j int) bool {
return parts[i].N < parts[j].N
})
return parts, nil
} }
func (s *Service) FileCreate(c *gin.Context) { func (s *Service) FileCreate(c *gin.Context) {
var originalChecksum = c.Param("original_checksum") file := &File{}
files, err := s.FileChunks(originalChecksum) if err := c.ShouldBindJSON(file); err != nil {
if err != nil {
s.Error(c, http.StatusInternalServerError, err) s.Error(c, http.StatusInternalServerError, err)
return return
} }
dir, err := s.StoreDir(originalChecksum)
if len(file.Sum) != 40 {
s.Error(c, http.StatusBadRequest, ErrStoreBadChecksum)
return
}
if len(file.Chunks) == 0 {
s.Error(c, http.StatusBadRequest, ErrStoreMissingChunks)
return
}
for _, chunk := range file.Chunks {
if len(chunk) != 40 {
s.Error(c, http.StatusBadRequest, ErrStoreBadChecksum)
return
}
}
r, rs, err := s.Store.CombineTemp(file.Sum, file.Chunks)
if err != nil { if err != nil {
s.Error(c, http.StatusInternalServerError, err) s.Error(c, http.StatusInternalServerError, err)
return return
} }
data, err := os.Create(filepath.Join(dir, ".tmp")) if r != file.Sum {
if err != nil { fmt.Printf("R=%s, O=%s\n", r, file.Sum)
s.Error(c, http.StatusInternalServerError, err)
return
}
defer data.Close()
sum := sha1.New()
size := uint64(0)
for _, f := range files {
b, err := fs.ReadFile(f.Path, f.Name)
if err != nil {
s.Error(c, http.StatusInternalServerError, err)
return
}
_, err = sum.Write(b)
if err != nil {
s.Error(c, http.StatusInternalServerError, err)
return
}
_, err = data.Write(b)
if err != nil {
s.Error(c, http.StatusInternalServerError, err)
return
}
size += uint64(len(b))
}
r := hex.EncodeToString(sum.Sum(nil))
if r != originalChecksum {
fmt.Printf("R=%s, O=%s\n", r, originalChecksum)
s.Error(c, http.StatusExpectationFailed, ErrStoreMismatchChecksum) s.Error(c, http.StatusExpectationFailed, ErrStoreMismatchChecksum)
return return
} }
if err = s.Store.CommitTemp(file.Sum, file.Chunks); err != nil {
s.Error(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"status": "success", "status": "success",
"checksum": originalChecksum, "sum": file.Sum,
"nbParts": len(files), "nbChunks": len(file.Chunks),
"size": size, "size": rs,
}) })
} }
func (s *Service) FileChunk(c *gin.Context) { func (s *Service) FileCreateTemp(c *gin.Context) {
var ( var (
originalChecksum = c.Param("original_checksum") origsum = c.Param("origsum")
part = c.Param("part") sumb = c.Param("sum")
partChecksum = c.Param("part_checksum")
) )
if len(originalChecksum) != 40 || len(partChecksum) != 40 { if len(origsum) != 40 || len(sumb) != 40 {
s.Error(c, http.StatusBadRequest, ErrStoreBadChecksum) s.Error(c, http.StatusBadRequest, ErrStoreBadChecksum)
return return
} }
@ -145,39 +79,15 @@ func (s *Service) FileChunk(c *gin.Context) {
return return
} }
p, err := strconv.ParseUint(part, 10, 64)
if err != nil || p < 1 {
s.Error(c, http.StatusBadRequest, ErrStoreBadPartNumber)
return
}
b := bytes.NewBuffer([]byte{}) b := bytes.NewBuffer([]byte{})
io.Copy(b, c.Request.Body) io.Copy(b, c.Request.Body)
c.Request.Body.Close() c.Request.Body.Close()
sum := sha1.New() if err := s.Store.SaveTemp(origsum, sumb, b.Bytes()); err != nil {
sum.Write(b.Bytes()) s.Error(c, http.StatusBadRequest, err)
r := hex.EncodeToString(sum.Sum(nil))
if partChecksum != r {
s.Error(c, http.StatusBadRequest, ErrStoreWrongPartChecksum)
return return
} }
dir, err := s.TempDir(originalChecksum)
if err != nil {
s.Error(c, http.StatusInternalServerError, err)
return
}
f, err := os.Create(filepath.Join(dir, part))
if err != nil {
s.Error(c, http.StatusInternalServerError, err)
return
}
f.Write(b.Bytes())
f.Close()
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"status": "success", "status": "success",
}) })

View File

@ -10,6 +10,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/go-sql-driver/mysql" "github.com/go-sql-driver/mysql"
"gitlab.celogeek.com/photos/api/internal/store"
"gorm.io/gorm" "gorm.io/gorm"
) )
@ -17,6 +18,7 @@ type Service struct {
Gin *gin.Engine Gin *gin.Engine
DB *gorm.DB DB *gorm.DB
Config *ServiceConfig Config *ServiceConfig
Store *store.Store
LogOk *Logger LogOk *Logger
LogErr *Logger LogErr *Logger
} }
@ -31,6 +33,7 @@ func New(config *ServiceConfig) *Service {
return &Service{ return &Service{
Gin: gin.New(), Gin: gin.New(),
Config: config, Config: config,
Store: &store.Store{Path: config.StorePath},
LogOk: &Logger{os.Stdout, "Photos"}, LogOk: &Logger{os.Stdout, "Photos"},
LogErr: &Logger{os.Stderr, "Photos"}, LogErr: &Logger{os.Stderr, "Photos"},
} }
@ -55,14 +58,26 @@ func (s *Service) SetupRoutes() {
album := s.Gin.Group("/file") album := s.Gin.Group("/file")
album.Use(s.RequireSession) album.Use(s.RequireSession)
album.POST("/:original_checksum", s.FileCreate) album.POST("/", s.FileCreate)
album.POST("/:original_checksum/:part/:part_checksum", s.FileChunk) album.POST("/tmp/:origsum/:sum", s.FileCreateTemp)
s.Gin.NoRoute(func(c *gin.Context) { s.Gin.NoRoute(func(c *gin.Context) {
s.Error(c, http.StatusNotFound, ErrReqNotFound) s.Error(c, http.StatusNotFound, ErrReqNotFound)
}) })
} }
func (s *Service) PrepareStore() {
d, err := os.Stat(s.Store.Path)
if err != nil {
s.LogErr.Fatal("Store", err)
}
if !d.IsDir() {
s.LogErr.Fatal("Store", ErrStorePathNotADirectory)
}
if err := s.Store.MkDirs([]string{"tmp", "original"}); err != nil {
s.LogErr.Fatal("Store", err)
}
}
func (s *Service) Run() error { func (s *Service) Run() error {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
s.PrepareStore() s.PrepareStore()

118
internal/store/core.go Normal file
View File

@ -0,0 +1,118 @@
package store
import (
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"os"
"path/filepath"
)
type Store struct {
Path string
}
func (s *Store) Dir(path, sum string) string {
return filepath.Join(s.Path, path, sum[0:1], sum[1:2])
}
func (s *Store) MkDirs(dirs []string) error {
for _, dir := range dirs {
if err := os.MkdirAll(filepath.Join(s.Path, dir), 0755); err != nil {
return err
}
}
return nil
}
func (s *Store) FileExists(filename string) bool {
fs, err := os.Stat(filename)
if errors.Is(err, os.ErrNotExist) {
return false
}
return !fs.IsDir()
}
func (s *Store) SaveTemp(path string, sumb string, b []byte) error {
sum := sha1.New()
sum.Write(b)
sumString := hex.EncodeToString(sum.Sum(nil))
if sumb != sumString {
return errors.New("wrong checksum")
}
dir := s.Dir(path, sumString)
filename := filepath.Join(dir, sumString)
if s.FileExists(filename) {
return nil
}
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
fs, err := os.Create(filename)
if err != nil {
return err
}
defer fs.Close()
_, err = fs.Write(b)
return err
}
func (s *Store) CombineTemp(path string, sumb []string) (string, uint64, error) {
tmpdir := filepath.Join("tmp", path)
sum := sha1.New()
size := uint64(0)
for _, sb := range sumb {
dir := s.Dir(tmpdir, sb)
filename := filepath.Join(dir, sb)
if _, err := os.Stat(filename); errors.Is(err, os.ErrNotExist) {
return "", 0, fmt.Errorf("%s: chunk %s doesn't exists", path, sb)
}
b, err := os.ReadFile(filename)
if err != nil {
return "", 0, err
}
sum.Write(b)
size += uint64(len(b))
}
return hex.EncodeToString(sum.Sum(nil)), size, nil
}
func (s *Store) CommitTemp(path string, sumb []string) error {
tmpdir := filepath.Join("tmp", path)
originaldir := s.Dir("original", path)
originalname := filepath.Join(originaldir, path)
if s.FileExists(originalname) {
return fmt.Errorf("original file already exists")
}
os.MkdirAll(originaldir, 0755)
fs, err := os.Create(originalname)
if err != nil {
return err
}
defer fs.Close()
for _, sb := range sumb {
dir := s.Dir(tmpdir, sb)
filename := filepath.Join(dir, sb)
if _, err := os.Stat(filename); errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("%s: chunk %s doesn't exists", path, sb)
}
b, err := os.ReadFile(filename)
if err != nil {
return err
}
if _, err := fs.Write(b); err != nil {
return err
}
}
return nil
}