From 34e7f2a942bc6f0acafa40ba1a9ca8a02ffac3a7 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 5 Aug 2025 10:52:22 +0200 Subject: [PATCH 01/13] Add new v2 push endpoint to create a project version - refactor common logic for push in v1 and v2 API - feature flag to use v2 push (for clients only) - dry run option in v2 endpoint to only validate request - use new error structure for v2 endpoint --- server/application.py | 1 + server/mergin/app.py | 20 +- server/mergin/sync/commands.py | 4 +- server/mergin/sync/config.py | 5 + server/mergin/sync/db_events.py | 10 + server/mergin/sync/errors.py | 42 +++ server/mergin/sync/files.py | 200 ++++++++--- server/mergin/sync/models.py | 251 +++++++++---- server/mergin/sync/public_api_controller.py | 338 +++++++----------- server/mergin/sync/public_api_v2.yaml | 208 +++++++++++ .../mergin/sync/public_api_v2_controller.py | 218 ++++++++++- server/mergin/sync/schemas.py | 2 +- server/mergin/sync/storages/disk.py | 39 +- server/mergin/sync/storages/storage.py | 4 - server/mergin/sync/utils.py | 16 +- server/mergin/tests/fixtures.py | 8 +- server/mergin/tests/test_config.py | 1 + server/mergin/tests/test_db_hooks.py | 7 +- .../mergin/tests/test_project_controller.py | 51 +-- server/mergin/tests/test_public_api_v2.py | 228 +++++++++++- server/mergin/tests/utils.py | 40 +-- 21 files changed, 1272 insertions(+), 421 deletions(-) diff --git a/server/application.py b/server/application.py index b1ab79ac..74f82213 100644 --- a/server/application.py +++ b/server/application.py @@ -47,6 +47,7 @@ "GLOBAL_WRITE", "ENABLE_SUPERADMIN_ASSIGNMENT", "DIAGNOSTIC_LOGS_URL", + "V2_PUSH_ENABLED", ] ) register_stats(application) diff --git a/server/mergin/app.py b/server/mergin/app.py index d0fd2f3a..8aa4171c 100644 --- a/server/mergin/app.py +++ b/server/mergin/app.py @@ -12,7 +12,17 @@ from sqlalchemy.schema import MetaData from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow -from flask import json, jsonify, request, abort, current_app, Flask, Request, Response +from flask import ( + json, + jsonify, + make_response, + request, + abort, + current_app, + Flask, + Request, + Response, +) from flask_login import current_user, LoginManager from flask_wtf.csrf import generate_csrf, CSRFProtect from flask_migrate import Migrate @@ -25,7 +35,7 @@ import time import traceback from werkzeug.exceptions import HTTPException -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Tuple from .sync.utils import get_blacklisted_dirs, get_blacklisted_files from .config import Configuration @@ -485,6 +495,12 @@ class ResponseError: def to_dict(self) -> Dict: return dict(code=self.code, detail=self.detail + f" ({self.code})") + def response(self, status_code: int) -> Tuple[Response, int]: + """Returns a custom error response with the given code.""" + response = make_response(jsonify(self.to_dict()), status_code) + response.headers["Content-Type"] = "application/problem+json" + return response, status_code + def whitespace_filter(obj): return obj.strip() if isinstance(obj, str) else obj diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 97e85981..4ec898cf 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -9,7 +9,6 @@ from datetime import datetime from flask import Flask, current_app -from .files import UploadChanges from ..app import db from .models import Project, ProjectVersion from .utils import split_project_path @@ -52,8 +51,7 @@ def create(name, namespace, username): # pylint: disable=W0612 p = Project(**project_params) p.updated = datetime.utcnow() db.session.add(p) - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") pv.project = p db.session.commit() os.makedirs(p.storage.project_dir, exist_ok=True) diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index b182da6d..b8d21466 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -31,6 +31,9 @@ class Configuration(object): MAX_CHUNK_SIZE = config( "MAX_CHUNK_SIZE", default=10 * 1024 * 1024, cast=int ) # in bytes + UPLOAD_CHUNKS_DIR = config( + "UPLOAD_CHUNKS_DIR", default=os.path.join(LOCAL_PROJECTS, "chunks") + ) # use nginx (in front of gunicorn) to serve files (https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/) USE_X_ACCEL = config("USE_X_ACCEL", default=False, cast=bool) PROJECTS_ARCHIVES_X_ACCEL_BUFFERING = config( @@ -64,3 +67,5 @@ class Configuration(object): ) # in seconds, older unfinished zips are moved to temp PARTIAL_ZIP_EXPIRATION = config("PARTIAL_ZIP_EXPIRATION", default=600, cast=int) + # whether new push is allowed + V2_PUSH_ENABLED = config("V2_PUSH_ENABLED", default=True, cast=bool) diff --git a/server/mergin/sync/db_events.py b/server/mergin/sync/db_events.py index 18d1ce60..48a1756d 100644 --- a/server/mergin/sync/db_events.py +++ b/server/mergin/sync/db_events.py @@ -6,6 +6,8 @@ from flask import current_app, abort from sqlalchemy import event +from .models import ProjectVersion +from .tasks import optimize_storage from ..app import db @@ -14,9 +16,17 @@ def check(session): abort(503, "Service unavailable due to maintenance, please try later") +def optimize_gpgk_storage(mapper, connection, project_version): + # do not optimize on every version, every 10th is just fine + if not project_version.name % 10: + optimize_storage.delay(project_version.project_id) + + def register_events(): event.listen(db.session, "before_commit", check) + event.listen(ProjectVersion, "after_insert", optimize_gpgk_storage) def remove_events(): event.remove(db.session, "before_commit", check) + event.listen(ProjectVersion, "after_insert", optimize_gpgk_storage) diff --git a/server/mergin/sync/errors.py b/server/mergin/sync/errors.py index d253ef4c..deca4dc3 100644 --- a/server/mergin/sync/errors.py +++ b/server/mergin/sync/errors.py @@ -39,3 +39,45 @@ def to_dict(self) -> Dict: class ProjectLocked(ResponseError): code = "ProjectLocked" detail = "The project is currently locked and you cannot make changes to it" + + +class DataSyncError(ResponseError): + code = "DataSyncError" + detail = "There are either corrupted files or it is not possible to create version with provided geopackage data" + + def __init__(self, failed_files: Dict): + self.failed_files = failed_files + + def to_dict(self) -> Dict: + data = super().to_dict() + data["failed_files"] = self.failed_files + return data + + +class ProjectVersionMismatch(ResponseError): + code = "ProjectVersionMismatch" + detail = "Project version mismatch" + + def __init__(self, client_version: int, server_version: int): + self.client_version = client_version + self.server_version = server_version + + def to_dict(self) -> Dict: + data = super().to_dict() + data["client_version"] = f"v{self.client_version}" + data["server_version"] = f"v{self.server_version}" + return data + + +class UploadError(ResponseError): + code = "UploadError" + detail = "Project version could not be created" + + def __init__(self, error: str = None): + self.error = error + + def to_dict(self) -> Dict: + data = super().to_dict() + if self.error is not None: + data["detail"] = self.error + f" ({self.code})" + return data diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 12b30afe..5c8c1427 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -2,15 +2,36 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import datetime +from enum import Enum import os from dataclasses import dataclass from typing import Optional, List -from marshmallow import fields, EXCLUDE, pre_load, post_load, post_dump +import uuid +from flask import current_app +from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema from pathvalidate import sanitize_filename +from .utils import ( + is_file_name_blacklisted, + is_qgis, + is_supported_extension, + is_valid_path, + is_versioned_file, +) from ..app import DateTimeWithZ, ma +class PushChangeType(Enum): + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + UPDATE_DIFF = "update_diff" + + @classmethod + def values(cls): + return [member.value for member in cls.__members__.values()] + + def mergin_secure_filename(filename: str) -> str: """Generate secure filename for given file""" filename = os.path.normpath(filename) @@ -24,94 +45,181 @@ def mergin_secure_filename(filename: str) -> str: @dataclass class File: - """Base class for every file object""" + """Base class for every file object, either intended to upload or already existing in project""" path: str checksum: str size: int - location: str def is_valid_gpkg(self): """Check if diff file is valid""" return self.size != 0 +@dataclass +class ProjectDiffFile(File): + """Metadata for geodiff diff file (aka. changeset) associated with geopackage""" + + # location where file is actually stored + location: str + + @dataclass class ProjectFile(File): - """Project file metadata including metadata for diff file""" + """Project file metadata including metadata for diff file and location where it is stored""" # metadata for gpkg diff file - diff: Optional[File] + diff: Optional[ProjectDiffFile] # deprecated attribute kept for public API compatibility mtime: Optional[datetime.datetime] + # location where file is actually stored + location: str @dataclass -class UploadFile(File): - """File to be uploaded coming from client push process""" - - # determined by client - chunks: Optional[List[str]] - diff: Optional[File] - +class ProjectFileChange(ProjectFile): + """Metadata of changed file in project version. + + This item is saved into database into file_history. + """ + + change: PushChangeType + + +def files_changes_from_upload( + changes: dict, location_dir: str +) -> List["ProjectFileChange"]: + """Create a list of version file changes from upload changes dictionary used by public API. + + It flattens changes dict and adds change type to each item. Also generates location for each file. + """ + secure_filenames = [] + version_changes = [] + for key in ("added", "updated", "removed"): + for item in changes.get(key, []): + location = os.path.join(location_dir, mergin_secure_filename(item["path"])) + diff = None + + # make sure we have unique location for each file + if location in secure_filenames: + filename, file_extension = os.path.splitext(location) + location = filename + f".{str(uuid.uuid4())}" + file_extension + + secure_filenames.append(location) + + if key == "removed": + change = PushChangeType.DELETE + location = None + elif key == "added": + change = PushChangeType.CREATE + else: + change = PushChangeType.UPDATE + if item.get("diff"): + change = PushChangeType.UPDATE_DIFF + diff_location = os.path.join( + location_dir, mergin_secure_filename(item["diff"]["path"]) + ) + if diff_location in secure_filenames: + filename, file_extension = os.path.splitext(diff_location) + diff_location = ( + filename + f".{str(uuid.uuid4())}" + file_extension + ) + + secure_filenames.append(diff_location) + diff = ProjectDiffFile( + path=item["diff"]["path"], + checksum=item["diff"]["checksum"], + size=item["diff"]["size"], + location=diff_location, + ) + + file_change = ProjectFileChange( + path=item["path"], + checksum=item["checksum"], + size=item["size"], + mtime=None, + change=change, + location=location, + diff=diff, + ) + version_changes.append(file_change) -@dataclass -class UploadChanges: - added: List[UploadFile] - updated: List[UploadFile] - removed: List[UploadFile] + return version_changes class FileSchema(ma.Schema): path = fields.String() size = fields.Integer() checksum = fields.String() - location = fields.String(load_default="", load_only=True) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return File(**data) - class UploadFileSchema(FileSchema): chunks = fields.List(fields.String(), load_default=[]) diff = fields.Nested(FileSchema(), many=False, load_default=None) - @pre_load - def pre_load(self, data, **kwargs): - # add future location based on context version - version = f"v{self.context.get('version')}" - if not data.get("location"): - data["location"] = os.path.join( - version, mergin_secure_filename(data["path"]) - ) - if data.get("diff") and not data.get("diff").get("location"): - data["diff"]["location"] = os.path.join( - version, mergin_secure_filename(data["diff"]["path"]) - ) - return data - - @post_load - def create_obj(self, data, **kwargs): - return UploadFile(**data) - class ChangesSchema(ma.Schema): """Schema for upload changes""" - added = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - updated = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - removed = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) + added = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + updated = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + removed = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return UploadChanges(**data) + @post_dump + def remove_blacklisted_files(self, data, **kwargs): + """Files which are blacklisted are not allowed to be uploaded and are simple ignored.""" + for key in ("added", "updated", "removed"): + data[key] = [ + f + for f in data[key] + if not is_file_name_blacklisted( + f["path"], current_app.config["BLACKLIST"] + ) + ] + return data + + @validates_schema + def validate(self, data, **kwargs): + """Basic consistency validations for upload metadata""" + changes_files = [ + f["path"] for f in data["added"] + data["updated"] + data["removed"] + ] + + if len(changes_files) == 0: + raise ValidationError("No changes") + + # changes' files must be unique + if len(set(changes_files)) != len(changes_files): + raise ValidationError("Not unique changes") + + # check if all .gpkg file are valid + for file in data["added"] + data["updated"]: + file_path = file["path"] + if is_versioned_file(file_path) and file["size"] == 0: + raise ValidationError("File is not valid") + + if not is_valid_path(file_path): + raise ValidationError( + f"Unsupported file name detected: {file_path}. Please remove the invalid characters." + ) + + if not is_supported_extension(file_path): + raise ValidationError( + f"Unsupported file type detected: {file_path}. " + f"Please remove the file or try compressing it into a ZIP file before uploading.", + ) class ProjectFileSchema(FileSchema): diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 3854e4d2..70080556 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -10,6 +10,7 @@ from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict +import logging from blinker import signal from flask_login import current_user @@ -23,30 +24,38 @@ from .files import ( File, - UploadChanges, + ProjectDiffFile, + ProjectFileChange, ChangesSchema, ProjectFile, + files_changes_from_upload, + mergin_secure_filename, + PushChangeType, ) from .interfaces import WorkspaceRole from .storages.disk import move_to_tmp from ..app import db from .storages import DiskStorage -from .utils import is_versioned_file, is_qgis +from .utils import ( + Toucher, + get_chunk_location, + get_project_path, + is_supported_type, + is_versioned_file, + is_qgis, +) Storages = {"local": DiskStorage} project_deleted = signal("project_deleted") project_access_granted = signal("project_access_granted") +push_finished = signal("push_finished") +project_version_created = signal("project_version_created") -class PushChangeType(Enum): - CREATE = "create" - UPDATE = "update" - DELETE = "delete" - UPDATE_DIFF = "update_diff" - - @classmethod - def values(cls): - return [member.value for member in cls.__members__.values()] +class FileSyncErrorType(Enum): + CORRUPTED = "corrupted" + UNSUPPORTED = "unsupported" + SYNC_ERROR = "sync error" class Project(db.Model): @@ -181,7 +190,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in db.session.execute(query, params).fetchall() ] @@ -504,9 +513,9 @@ def path(self) -> str: return self.file.path @property - def diff_file(self) -> Optional[File]: + def diff_file(self) -> Optional[ProjectDiffFile]: if self.diff: - return File(**self.diff) + return ProjectDiffFile(**self.diff) @property def mtime(self) -> datetime: @@ -705,7 +714,7 @@ def __init__( project: Project, name: int, author_id: int, - changes: UploadChanges, + changes: List[ProjectFileChange], ip: str, user_agent: str = None, device_id: str = None, @@ -725,9 +734,7 @@ def __init__( ).all() } - changed_files_paths = [ - f.path for f in changes.updated + changes.removed + changes.added - ] + changed_files_paths = set(change.path for change in changes) existing_files_map = { f.path: f for f in ProjectFilePath.query.filter_by(project_id=self.project_id) @@ -735,46 +742,32 @@ def __init__( .all() } - for key in ( - ("added", PushChangeType.CREATE), - ("updated", PushChangeType.UPDATE), - ("removed", PushChangeType.DELETE), - ): - change_attr = key[0] - change_type = key[1] - - for upload_file in getattr(changes, change_attr): - is_diff_change = ( - change_type is PushChangeType.UPDATE - and upload_file.diff is not None - ) - - file = existing_files_map.get( - upload_file.path, ProjectFilePath(self.project_id, upload_file.path) - ) - fh = FileHistory( - file=file, - size=upload_file.size, - checksum=upload_file.checksum, - location=upload_file.location, - diff=( - asdict(upload_file.diff) - if (is_diff_change and upload_file.diff) - else null() - ), - change=( - PushChangeType.UPDATE_DIFF if is_diff_change else change_type - ), - ) - fh.version = self - fh.project_version_name = self.name - db.session.add(fh) - db.session.flush() + for item in changes: + # get existing DB file reference or create a new one (for added files) + db_file = existing_files_map.get( + item.path, ProjectFilePath(self.project_id, item.path) + ) + fh = FileHistory( + file=db_file, + size=item.size, + checksum=item.checksum, + location=item.location, + diff=( + asdict(item.diff) + if (item.change is PushChangeType.UPDATE_DIFF and item.diff) + else null() + ), + change=item.change, + ) + fh.version = self + fh.project_version_name = self.name + db.session.add(fh) + db.session.flush() - if change_type is PushChangeType.DELETE: - latest_files_map.pop(fh.path, None) - else: - latest_files_map[fh.path] = fh.id + if item.change is PushChangeType.DELETE: + latest_files_map.pop(fh.path, None) + else: + latest_files_map[fh.path] = fh.id # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() @@ -909,7 +902,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in result ] @@ -1021,9 +1014,7 @@ class Upload(db.Model): ) __table_args__ = (db.UniqueConstraint("project_id", "version"),) - def __init__( - self, project: Project, version: int, changes: UploadChanges, user_id: int - ): + def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.id = str(uuid.uuid4()) self.project_id = project.id self.version = version @@ -1053,6 +1044,144 @@ def clear(self): db.session.delete(self) db.session.commit() + def process_chunks( + self, use_shared_chunk_dir: bool + ) -> Tuple[List[ProjectFileChange], Dict]: + """Concatenate chunks into single file and apply gpkg updates if needed""" + errors = {} + project_path = get_project_path(self.project) + v_next_version = ProjectVersion.to_v_name(self.project.next_version()) + chunks_map = { + f["path"]: f["chunks"] + for f in self.changes["added"] + self.changes["updated"] + } + file_changes = files_changes_from_upload(self.changes, v_next_version) + to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] + current_files = [f for f in self.project.files if f.path not in to_remove] + + with Toucher(self.lockfile, 5): + for f in file_changes: + if f.change == PushChangeType.DELETE: + continue + + f_location = ( + f.diff.location + if f.change == PushChangeType.UPDATE_DIFF + else f.location + ) + temporary_location = os.path.join(self.upload_dir, "files", f_location) + os.makedirs(os.path.dirname(temporary_location), exist_ok=True) + with open(temporary_location, "wb") as dest: + try: + for chunk_id in chunks_map.get(f.path, []): + # based on API version location for uploaded chunks differs + if use_shared_chunk_dir: + chunk_file = os.path.join( + current_app.config["UPLOAD_CHUNKS_DIR"], + *get_chunk_location(chunk_id), + ) + else: + chunk_file = os.path.join( + self.upload_dir, "chunks", chunk_id + ) + + if not os.path.exists(chunk_file): + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + with open(chunk_file, "rb") as src: + data = src.read(8192) + while data: + dest.write(data) + data = src.read(8192) + + move_to_tmp(chunk_file) + except IOError: + logging.exception( + f"Failed to process chunk: {chunk_id} in project {project_path}" + ) + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + if not is_supported_type(temporary_location): + logging.info(f"Rejecting blacklisted file: {temporary_location}") + errors[f.path] = FileSyncErrorType.UNSUPPORTED.value + continue + + # check if .gpkg file is valid + if is_versioned_file(temporary_location) and not f.is_valid_gpkg(): + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + expected_size = ( + f.diff.size if f.change == PushChangeType.UPDATE_DIFF else f.size + ) + if expected_size != os.path.getsize(temporary_location): + logging.error( + f"Data integrity check has failed on file {f.path} in project {project_path}", + exc_info=True, + ) + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + # for updates try to apply diff to create a full updated gpkg file or from full .gpkg try to create corresponding diff + if f.change in ( + PushChangeType.UPDATE, + PushChangeType.UPDATE_DIFF, + ) and is_versioned_file(f.path): + current_file = next( + (i for i in current_files if i.path == f.path), None + ) + if not current_file: + errors[f.path] = ( + f"{FileSyncErrorType.SYNC_ERROR.value}: file not found on server" + ) + continue + + if f.diff: + changeset = temporary_location + patched_file = os.path.join( + self.upload_dir, "files", f.location + ) + + result = self.project.storage.apply_diff( + current_file, changeset, patched_file + ) + if result.ok(): + checksum, size = result.value + f.checksum = checksum + f.size = size + else: + errors[f.path] = ( + f"{FileSyncErrorType.SYNC_ERROR.value}: project {self.project.workspace.name}/{self.project.name}, {result.value}" + ) + else: + diff_name = mergin_secure_filename( + f.path + "-diff-" + str(uuid.uuid4()) + ) + changeset = os.path.join( + self.upload_dir, "files", v_next_version, diff_name + ) + patched_file = temporary_location + result = self.project.storage.construct_diff( + current_file, changeset, patched_file + ) + if result.ok(): + checksum, size = result.value + f.diff = ProjectDiffFile( + checksum=checksum, + size=size, + path=diff_name, + location=os.path.join(v_next_version, diff_name), + ) + f.change = PushChangeType.UPDATE_DIFF + else: + # if diff cannot be constructed it would be a force update + logging.warning( + f"Geodiff: create changeset error {result.value}" + ) + return file_changes, errors + class RequestStatus(Enum): ACCEPTED = "accepted" diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 9fd229a1..ab0908d1 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -14,8 +14,8 @@ from datetime import datetime import gevent +from marshmallow import ValidationError import psycopg2 -from blinker import signal from connexion import NoContent, request from flask import ( abort, @@ -40,6 +40,7 @@ from ..auth import auth_required from ..auth.models import User from .models import ( + FileSyncErrorType, Project, ProjectVersion, Upload, @@ -48,13 +49,17 @@ ProjectFilePath, ProjectUser, ProjectRole, + project_version_created, + push_finished, ) from .files import ( - UploadChanges, + ProjectFileChange, ChangesSchema, UploadFileSchema, ProjectFileSchema, FileSchema, + files_changes_from_upload, + mergin_secure_filename, ) from .schemas import ( ProjectSchema, @@ -65,7 +70,7 @@ FileHistorySchema, ProjectVersionListSchema, ) -from .storages.storage import DataSyncError, InitializationError +from .storages.storage import InitializationError from .storages.disk import save_to_file, move_to_tmp from .permissions import ( require_project, @@ -96,11 +101,6 @@ from ..utils import format_time_delta -push_finished = signal("push_finished") -# TODO: Move to database events to handle all commits to project versions -project_version_created = signal("project_version_created") - - def parse_project_access_update_request(access: Dict) -> Dict: """Parse raw project access update request and filter out invalid entries. New access can be specified either by list of usernames or ids -> convert only to ids fur further processing. @@ -239,15 +239,24 @@ def add_project(namespace): # noqa: E501 .first_or_404() ) version_name = 1 - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(template.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in template.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) else: template = None version_name = 0 - changes = UploadChanges(added=[], updated=[], removed=[]) + file_changes = [] try: p.storage.initialize(template_project=template) @@ -258,7 +267,7 @@ def add_project(namespace): # noqa: E501 p, version_name, current_user.id, - changes, + file_changes, get_ip(request), get_user_agent(request), get_device_id(request), @@ -694,7 +703,10 @@ def catch_sync_failure(f): @functools.wraps(f) def wrapper(*args, **kwargs): try: - return f(*args, **kwargs) + response, status_code = f(*args, **kwargs) + if status_code >= 400: + raise HTTPException(response=response) + return response, status_code except (HTTPException, IntegrityError) as e: if e.code in [401, 403, 404]: raise # nothing to do, just propagate downstream @@ -711,6 +723,11 @@ def wrapper(*args, **kwargs): error_type = "push_finish" elif request.endpoint == "chunk_upload": error_type = "chunk_upload" + elif ( + request.endpoint + == "/v2.mergin_sync_public_api_v2_controller_create_project_version" + ): + error_type = "project_push" if not e.description: # custom error cases (e.g. StorageLimitHit) e.description = e.response.json["detail"] @@ -745,7 +762,7 @@ def project_push(namespace, project_name): project_permission = current_app.project_handler.get_push_permission(changes) project = require_project(namespace, project_name, project_permission) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + return ProjectLocked().response(422) # pass full project object to request for later use request.view_args["project"] = project ws = project.workspace @@ -771,76 +788,36 @@ def project_push(namespace, project_name): if pending_upload and pending_upload.is_active(): abort(400, "Another process is running. Please try later.") - upload_changes = ChangesSchema(context={"version": version + 1}).load(changes) + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + abort(400, msg) - for item in upload_changes.added: + for item in upload_changes["added"]: # check if same file is not already uploaded - if not all(ele.path != item.path for ele in project.files): - abort(400, f"File {item.path} has been already uploaded") - if not is_valid_path(item.path): - abort( - 400, - f"Unsupported file name detected: {item.path}. Please remove the invalid characters.", - ) - if not is_supported_extension(item.path): - abort( - 400, - f"Unsupported file type detected: {item.path}. " - f"Please remove the file or try compressing it into a ZIP file before uploading.", - ) - - # changes' files must be unique - changes_files = [ - f.path - for f in upload_changes.added + upload_changes.updated + upload_changes.removed - ] - if len(set(changes_files)) != len(changes_files): - abort(400, "Not unique changes") - - sanitized_files = [] - blacklisted_files = [] - for f in upload_changes.added + upload_changes.updated + upload_changes.removed: - # check if .gpkg file is valid - if is_versioned_file(f.path): - if not f.is_valid_gpkg(): - abort(400, f"File {f.path} is not valid") - if is_file_name_blacklisted(f.path, current_app.config["BLACKLIST"]): - blacklisted_files.append(f.path) - # all file need to be unique after sanitized - if f.location in sanitized_files: - filename, file_extension = os.path.splitext(f.location) - f.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.location) - if f.diff: - if f.diff.location in sanitized_files: - filename, file_extension = os.path.splitext(f.diff.location) - f.diff.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.diff.location) - - # remove blacklisted files from changes - for key in upload_changes.__dict__.keys(): - new_value = [ - f for f in getattr(upload_changes, key) if f.path not in blacklisted_files - ] - setattr(upload_changes, key, new_value) + if not all(ele.path != item["path"] for ele in project.files): + abort(400, f"File {item['path']} has been already uploaded") # Check user data limit - updates = [f.path for f in upload_changes.updated] - updated_files = list(filter(lambda i: i.path in updates, project.files)) + updated_files = list( + filter( + lambda i: i.path in [f["path"] for f in upload_changes["updated"]], + project.files, + ) + ) additional_disk_usage = ( - sum(file.size for file in upload_changes.added + upload_changes.updated) + sum( + file["size"] for file in upload_changes["added"] + upload_changes["updated"] + ) - sum(file.size for file in updated_files) - - sum(file.size for file in upload_changes.removed) + - sum(file["size"] for file in upload_changes["removed"]) ) - current_usage = ws.disk_usage() requested_storage = current_usage + additional_disk_usage if requested_storage > ws.storage: - abort( - make_response( - jsonify(StorageLimitHit(current_usage, ws.storage).to_dict()), 422 - ) - ) + return StorageLimitHit(current_usage, ws.storage).response(422) upload = Upload(project, version, upload_changes, current_user.id) db.session.add(upload) @@ -885,6 +862,9 @@ def project_push(namespace, project_name): # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 + file_changes = files_changes_from_upload( + upload.changes, ProjectVersion.to_v_name(next_version) + ) user_agent = get_user_agent(request) device_id = get_device_id(request) try: @@ -892,7 +872,7 @@ def project_push(namespace, project_name): project, next_version, current_user.id, - upload_changes, + file_changes, get_ip(request), user_agent, device_id, @@ -919,7 +899,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id} + return {"transaction": upload.id}, 200 @auth_required @@ -938,29 +918,27 @@ def chunk_upload(transaction_id, chunk_id): """ upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - upload_changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) - for f in upload_changes.added + upload_changes.updated: - if chunk_id in f.chunks: - dest = os.path.join(upload_dir, "chunks", chunk_id) - lockfile = os.path.join(upload_dir, "lockfile") - with Toucher(lockfile, 30): - try: - # we could have used request.data here, but it could eventually cause OOM issue - save_to_file( - request.stream, dest, current_app.config["MAX_CHUNK_SIZE"] - ) - except IOError: - move_to_tmp(dest, transaction_id) - abort(400, "Too big chunk") - if os.path.exists(dest): - checksum = generate_checksum(dest) - size = os.path.getsize(dest) - return jsonify({"checksum": checksum, "size": size}), 200 - else: - abort(400, "Upload was probably canceled") - abort(404) + chunks = [] + for file in upload.changes["added"] + upload.changes["updated"]: + chunks += file.get("chunks", []) + + if chunk_id not in chunks: + abort(404) + + dest = os.path.join(upload_dir, "chunks", chunk_id) + with Toucher(upload.lockfile, 30): + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest, transaction_id) + abort(400, "Too big chunk") + if os.path.exists(dest): + checksum = generate_checksum(dest) + size = os.path.getsize(dest) + return jsonify({"checksum": checksum, "size": size}), 200 + else: + abort(400, "Upload was probably canceled") @auth_required @@ -980,73 +958,45 @@ def push_finish(transaction_id): :rtype: None """ - from .tasks import optimize_storage - upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) project = upload.project + next_version = project.next_version() + v_next_version = ProjectVersion.to_v_name(next_version) + version_dir = os.path.join(project.storage.project_dir, v_next_version) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) - project_path = get_project_path(project) - corrupted_files = [] - - for f in changes.added + changes.updated: - if f.diff is not None: - dest_file = os.path.join(upload_dir, "files", f.diff.location) - expected_size = f.diff.size - else: - dest_file = os.path.join(upload_dir, "files", f.location) - expected_size = f.size - - # Concatenate chunks into single file - # TODO we need to move this elsewhere since it can fail for large files (and slow FS) - os.makedirs(os.path.dirname(dest_file), exist_ok=True) - with open(dest_file, "wb") as dest: - try: - for chunk_id in f.chunks: - sleep(0) # to unblock greenlet - chunk_file = os.path.join(upload_dir, "chunks", chunk_id) - with open(chunk_file, "rb") as src: - data = src.read(8192) - while data: - dest.write(data) - data = src.read(8192) - except IOError: - logging.exception( - "Failed to process chunk: %s in project %s" - % (chunk_id, project_path) - ) - corrupted_files.append(f.path) - continue - if not is_supported_type(dest_file): - logging.info(f"Rejecting blacklisted file: {dest_file}") + return ProjectLocked().response(422) + + file_changes, errors = upload.process_chunks(use_shared_chunk_dir=False) + if errors: + upload.clear() + + unsupported_files = [ + k for k, v in errors.items() if v == FileSyncErrorType.UNSUPPORTED.value + ] + if len(unsupported_files): abort( 400, - f"Unsupported file type detected: {f.path}. " + f"Unsupported file type detected: {unsupported_files[0]}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) - if expected_size != os.path.getsize(dest_file): - logging.error( - "Data integrity check has failed on file %s in project %s" - % (f.path, project_path), - exc_info=True, - ) - # check if .gpkg file is valid - if is_versioned_file(dest_file): - if not f.is_valid_gpkg(): - corrupted_files.append(f.path) - corrupted_files.append(f.path) + corrupted_files = [ + k for k, v in errors.items() if v == FileSyncErrorType.CORRUPTED.value + ] + if corrupted_files: + abort(422, {"corrupted_files": corrupted_files}) - if corrupted_files: - move_to_tmp(upload_dir) - abort(422, {"corrupted_files": corrupted_files}) + sync_errors = { + k: v for k, v in errors.items() if FileSyncErrorType.SYNC_ERROR.value in v + } + if sync_errors: + msg = "" + for key, value in sync_errors.items(): + msg += key + " error=" + value + "\n" + + abort(422, f"Failed to create new version: {msg}") - next_version = upload.version + 1 - v_next_version = ProjectVersion.to_v_name(next_version) files_dir = os.path.join(upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): @@ -1065,58 +1015,13 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - # let's move uploaded files where they are expected to be - os.renames(files_dir, target_dir) - # apply gpkg updates - sync_errors = {} - to_remove = [i.path for i in changes.removed] - current_files = [f for f in project.files if f.path not in to_remove] - for updated_file in changes.updated: - # yield to gevent hub since geodiff action can take some time to prevent worker timeout - sleep(0) - current_file = next( - (i for i in current_files if i.path == updated_file.path), None - ) - if not current_file: - sync_errors[updated_file.path] = "file not found on server " - continue - - if updated_file.diff: - result = project.storage.apply_diff( - current_file, updated_file, next_version - ) - if result.ok(): - checksum, size = result.value - updated_file.checksum = checksum - updated_file.size = size - else: - sync_errors[updated_file.path] = ( - f"project: {project.workspace.name}/{project.name}, {result.value}" - ) - - elif is_versioned_file(updated_file.path): - result = project.storage.construct_diff( - current_file, updated_file, next_version - ) - if result.ok(): - updated_file.diff = result.value - else: - # if diff cannot be constructed it would be force update - logging.warning(f"Geodiff: create changeset error {result.value}") - - if sync_errors: - msg = "" - for key, value in sync_errors.items(): - msg += key + " error=" + value + "\n" - raise DataSyncError(msg) - user_agent = get_user_agent(request) device_id = get_device_id(request) pv = ProjectVersion( project, next_version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, @@ -1124,12 +1029,15 @@ def push_finish(transaction_id): db.session.add(pv) db.session.add(project) db.session.commit() + + # let's move uploaded files where they are expected to be + os.renames(files_dir, version_dir) logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) project_version_created.send(pv) push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, DataSyncError, IntegrityError) as err: + except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " @@ -1144,9 +1052,6 @@ def push_finish(transaction_id): # remove artifacts upload.clear() - # do not optimize on every version, every 10th is just fine - if not project.latest_version % 10: - optimize_storage.delay(project.id) return jsonify(ProjectSchema().dump(project)), 200 @@ -1246,15 +1151,24 @@ def clone_project(namespace, project_name): # noqa: E501 user_agent = get_user_agent(request) device_id = get_device_id(request) # transform source files to new uploaded files - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(cloned_project.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in cloned_project.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) project_version = ProjectVersion( p, version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index 04dbce61..e07a2807 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -219,6 +219,87 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + /projects/{id}/versions: + post: + tags: + - project + summary: Create a new project version from pushed data + operationId: create_project_version + parameters: + - $ref: "#/components/parameters/ProjectId" + requestBody: + description: Project files changes and current version head. + required: true + content: + application/json: + schema: + type: object + required: + - version + - changes + properties: + check_only: + type: boolean + default: false + example: true + version: + type: string + pattern: '^$|^v\d+$' + example: v2 + changes: + type: object + required: + - added + - updated + - removed + properties: + added: + type: array + items: + $ref: "#/components/schemas/UploadFile" + updated: + type: array + items: + $ref: "#/components/schemas/UpdateFile" + removed: + type: array + items: + $ref: "#/components/schemas/File" + responses: + "201": + description: New project version + content: + application/json: + schema: + $ref: "#/components/schemas/ProjectVersion" + "204": + $ref: "#/components/responses/NoContent" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthorized" + "403": + $ref: "#/components/responses/Forbidden" + "404": + $ref: "#/components/responses/NotFound" + "409": + description: Version already exists or another process is already running + content: + application/problem+json: + schema: + $ref: "#/components/schemas/ProjectVersionMismatch" + "422": + description: Request could not be processed by server + content: + application/problem+json: + schema: + anyOf: + - $ref: "#/components/schemas/UploadError" + - $ref: "#/components/schemas/TrialExpired" + - $ref: "#/components/schemas/StorageLimitHit" + - $ref: "#/components/schemas/ProjectLocked" + - $ref: "#/components/schemas/DataSyncError" + x-openapi-router-controller: mergin.sync.public_api_v2_controller components: responses: NoContent: @@ -244,6 +325,74 @@ components: format: uuid pattern: \b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b schemas: + # Errors + CustomError: + type: object + properties: + code: + type: string + detail: + type: string + required: + - code + - detail + TrialExpired: + allOf: + - $ref: '#/components/schemas/CustomError' + example: + code: TrialExpired + detail: Failed to push changes. Ask the workspace owner to log in to their Mergin Maps dashboard + StorageLimitHit: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + current_usage: + type: integer + storage_limit: + type: integer + example: + code: StorageLimitHit + detail: You have reached a data limit (StorageLimitHit) + current_usage: 24865 + storage_limit: 24865 + ProjectLocked: + allOf: + - $ref: '#/components/schemas/CustomError' + example: + code: ProjectLocked + detail: The project is currently locked and you cannot make changes to it (ProjectLocked) + ProjectVersionMismatch: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + client_version: + type: string + server_version: + type: string + example: + code: ProjectVersionMismatch + detail: Project version mismatch (ProjectVersionMismatch) + DataSyncError: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + failed_files: + type: object + example: + code: DataSyncError + detail: "There are either corrupted files or it is not possible to create version with provided geopackage data (DataSyncError)" + failed_files: + "survey.gpkg": "Corrupted file" + UploadError: + allOf: + - $ref: '#/components/schemas/CustomError' + example: + code: UploadError + detail: "Project version could not be created (UploadError)" + # Data ProjectRole: type: string nullable: true @@ -287,3 +436,62 @@ components: $ref: "#/components/schemas/ProjectRole" role: $ref: "#/components/schemas/Role" + File: + type: object + description: Project file metadata + required: + - path + - size + properties: + path: + type: string + example: media/favicon.ico + size: + type: integer + format: int64 + example: 1024 + checksum: + description: sha1 hash of file + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + UploadFile: + description: Metadata of uploaded file with chunks it is composed of + allOf: + - $ref: "#/components/schemas/File" + - type: object + properties: + chunks: + type: array + items: + type: string + example: d17a60eb-6581-431c-adfc-3451231455bb + UpdateFile: + description: Metadata of updated file with optional metadata about uploaded file diff + allOf: + - $ref: "#/components/schemas/UploadFile" + - type: object + properties: + diff: + nullable: true + allOf: + - $ref: "#/components/schemas/File" + ProjectVersion: + type: object + properties: + name: + type: string + example: v1 + author: + type: string + example: john.doe + created: + type: string + format: date-time + example: 2018-11-30T08:47:58.636074Z + project_name: + type: string + example: survey + namespace: + type: string + example: john.doe + \ No newline at end of file diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 7f40c54b..f12ae610 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -2,20 +2,44 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from datetime import datetime +import gevent +import logging +import os +import psycopg2 from connexion import NoContent, request -from flask import abort, jsonify +from datetime import datetime +from flask import abort, current_app, jsonify from flask_login import current_user +from marshmallow import ValidationError +from psycopg2 import IntegrityError -from mergin.sync.forms import project_name_validation - -from .schemas import ProjectMemberSchema -from .workspace import WorkspaceRole from ..app import db from ..auth import auth_required from ..auth.models import User -from .models import Project, ProjectRole, ProjectMember +from .errors import ( + DataSyncError, + ProjectLocked, + ProjectVersionMismatch, + StorageLimitHit, + UploadError, +) +from .files import ChangesSchema +from .forms import project_name_validation +from .models import ( + Project, + ProjectRole, + ProjectMember, + ProjectVersion, + Upload, + project_version_created, + push_finished, +) from .permissions import ProjectPermissions, require_project_by_uuid +from .public_api_controller import catch_sync_failure +from .schemas import ProjectMemberSchema, ProjectVersionSchema +from .storages.disk import move_to_tmp +from .utils import get_device_id, get_ip, get_user_agent +from .workspace import WorkspaceRole @auth_required @@ -128,3 +152,183 @@ def remove_project_collaborator(id, user_id): project.unset_role(user_id) db.session.commit() return NoContent, 204 + + +@auth_required +@catch_sync_failure +def create_project_version(id): + """Create a new project version from pushed data""" + version: int = ProjectVersion.from_v_name(request.json["version"]) + changes = request.json["changes"] + project_permission: ProjectPermissions = ( + current_app.project_handler.get_push_permission(changes) + ) + project = require_project_by_uuid(id, project_permission) + # pass full project object to request for later use + request.view_args["project"] = project + + if project.locked_until: + return ProjectLocked().response(422) + + next_version = project.next_version() + v_next_version = ProjectVersion.to_v_name(next_version) + version_dir = os.path.join(project.storage.project_dir, v_next_version) + + pv = project.get_latest_version() + if pv and pv.name != version: + return ProjectVersionMismatch(version, pv.name).response(409) + + # reject push if there is another one already running + pending_upload = Upload.query.filter_by(project_id=project.id).first() + if pending_upload and pending_upload.is_active(): + return ProjectVersionMismatch(version, next_version).response(409) + + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + return UploadError(error=msg).response(422) + + # check consistency of changes + current_files = set(file.path for file in project.files) + added_files = set(file["path"] for file in upload_changes["added"]) + if added_files and added_files.issubset(current_files): + return UploadError( + error=f"Add changes contain files which already exist" + ).response(422) + + modified_files = set( + file["path"] for file in upload_changes["updated"] + upload_changes["removed"] + ) + if modified_files and not modified_files.issubset(current_files): + return UploadError( + error="Update or remove changes contain files that are not in project" + ).response(422) + + # Check user data limit + updated_files = list( + filter( + lambda i: i.path in [f["path"] for f in upload_changes["updated"]], + project.files, + ) + ) + additional_disk_usage = ( + sum( + file["size"] for file in upload_changes["added"] + upload_changes["updated"] + ) + - sum(file.size for file in updated_files) + - sum(file["size"] for file in upload_changes["removed"]) + ) + + current_usage = project.workspace.disk_usage() + requested_storage = current_usage + additional_disk_usage + if requested_storage > project.workspace.storage: + return StorageLimitHit(current_usage, project.workspace.storage).response(422) + + # we have done all checks but this request is just a dry-run + if request.json.get("check_only", False): + return NoContent, 204 + + # while processing data, block other uploads + upload = Upload(project, version, upload_changes, current_user.id) + db.session.add(upload) + try: + # Creating blocking upload can fail, e.g. in case of racing condition + db.session.commit() + except IntegrityError: + db.session.rollback() + # check and clean dangling blocking uploads or abort + for current_upload in project.uploads.all(): + if current_upload.is_active(): + return ProjectVersionMismatch(version, next_version).response(409) + db.session.delete(current_upload) + db.session.commit() + # previous push attempt is definitely lost + project.sync_failed( + "", + "push_lost", + "Push artefact removed by subsequent push", + current_user.id, + ) + + # Try again after cleanup + db.session.add(upload) + try: + db.session.commit() + move_to_tmp(upload.upload_dir) + except IntegrityError as err: + logging.error(f"Failed to create upload session: {str(err)}") + return ProjectVersionMismatch(version, next_version).response(409) + + # Create transaction folder and lockfile + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() + + file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) + # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted + if errors: + upload.clear() + return DataSyncError(failed_files=errors).response(422) + + try: + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() + # let's move uploaded files where they are expected to be + temp_files_dir = os.path.join(upload.upload_dir, "files") + os.renames(temp_files_dir, version_dir) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." + ) + project_version_created.send(pv) + push_finished.send(pv) + except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + db.session.rollback() + logging.exception( + f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " + f"upload id: {upload.id}.: {str(err)}" + ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) + return UploadError().response(422) + # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up + except gevent.timeout.Timeout: + db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) + raise + finally: + # remove artifacts + upload.clear() + + return ( + ProjectVersionSchema( + exclude=( + "files", + "changes", + "changesets", + ) + ).dump(pv), + 201, + ) diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 75b6f09e..617ad531 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -75,7 +75,7 @@ def project_user_permissions(project): class FileHistorySchema(ma.SQLAlchemyAutoSchema): mtime = DateTimeWithZ() - diff = fields.Nested(FileSchema(), attribute="diff_file", exclude=("location",)) + diff = fields.Nested(FileSchema(), attribute="diff_file") expiration = DateTimeWithZ(attribute="expiration", dump_only=True) class Meta: diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index 4debb255..b06e51af 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -21,7 +21,7 @@ generate_checksum, is_versioned_file, ) -from ..files import mergin_secure_filename, ProjectFile, UploadFile, File +from ..files import mergin_secure_filename, ProjectFile, File def save_to_file(stream, path, max_size=None): @@ -245,17 +245,15 @@ def _generator(): return _generator() def apply_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, current_file: ProjectFile, changeset: str, patchedfile: str ) -> Result: """Apply geodiff diff file on current gpkg basefile. Creates GeodiffActionHistory record of the action. Returns checksum and size of generated file. If action fails it returns geodiff error message. """ from ..models import GeodiffActionHistory, ProjectVersion - v_name = ProjectVersion.to_v_name(version) + v_name = ProjectVersion.to_v_name(self.project.next_version()) basefile = os.path.join(self.project_dir, current_file.location) - changeset = os.path.join(self.project_dir, upload_file.diff.location) - patchedfile = os.path.join(self.project_dir, upload_file.location) # create local copy of basefile which will be updated in next version and changeset needed # TODO this can potentially fail for large files logging.info(f"Apply changes: copying {basefile} to {patchedfile}") @@ -313,28 +311,23 @@ def apply_diff( return Err(self.gediff_log.getvalue()) def construct_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, + current_file: ProjectFile, + changeset: str, + uploaded_file: str, ) -> Result: """Construct geodiff diff file from uploaded gpkg and current basefile. Returns diff metadata as a result. If action fails it returns geodiff error message. """ - from ..models import ProjectVersion - - v_name = ProjectVersion.to_v_name(version) basefile = os.path.join(self.project_dir, current_file.location) - uploaded_file = os.path.join(self.project_dir, upload_file.location) - diff_name = upload_file.path + "-diff-" + str(uuid.uuid4()) - changeset = os.path.join(self.project_dir, v_name, diff_name) + diff_name = os.path.basename(changeset) with self.geodiff_copy(basefile) as basefile_tmp, self.geodiff_copy( uploaded_file ) as uploaded_file_tmp: try: # create changeset next to uploaded file copy changeset_tmp = os.path.join( - uploaded_file_tmp.replace(upload_file.location, "").rstrip( - os.path.sep - ), - v_name, + os.path.dirname(uploaded_file_tmp), diff_name, ) self.flush_geodiff_logger() @@ -344,15 +337,13 @@ def construct_diff( self.geodiff.create_changeset( basefile_tmp, uploaded_file_tmp, changeset_tmp ) - # create diff metadata as it would be created by other clients - diff_file = File( - path=diff_name, - checksum=generate_checksum(changeset_tmp), - size=os.path.getsize(changeset_tmp), - location=os.path.join(v_name, mergin_secure_filename(diff_name)), - ) copy_file(changeset_tmp, changeset) - return Ok(diff_file) + return Ok( + ( + generate_checksum(changeset_tmp), + os.path.getsize(changeset_tmp), + ) + ) except (GeoDiffLibError, GeoDiffLibConflictError) as e: # diff is not possible to create - file will be overwritten move_to_tmp(changeset) diff --git a/server/mergin/sync/storages/storage.py b/server/mergin/sync/storages/storage.py index fd4c1e81..3b9699a6 100644 --- a/server/mergin/sync/storages/storage.py +++ b/server/mergin/sync/storages/storage.py @@ -11,10 +11,6 @@ class FileNotFound(Exception): pass -class DataSyncError(Exception): - pass - - class InitializationError(Exception): pass diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index c4d5fa16..db961d46 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -13,7 +13,7 @@ from shapely.errors import ShapelyError from gevent import sleep from flask import Request -from typing import Optional +from typing import Optional, Tuple from sqlalchemy import text from pathvalidate import ( validate_filename, @@ -83,6 +83,8 @@ def touch_lockfile(self): os.access(self.lockfile, os.W_OK) with open(self.lockfile, "a"): os.utime(self.lockfile, None) + + sleep(0) # to unblock greenlet if self.running: self.timer = Timer(self.interval, self.touch_lockfile) self.timer.start() @@ -578,3 +580,15 @@ def get_x_accel_uri(*url_parts): url = url.lstrip(os.path.sep) result = os.path.join(download_accell_uri, url) return result + + +def get_chunk_location(id: str) -> Tuple[str, str]: + """ + Splits the given identifier into two parts. + + Returns a tuple where the first element is the first two characters + of the identifier, and the second element is the remaining + characters. + """ + + return id[0:2], id[2:] diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 7cff688e..e50d3350 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -19,7 +19,7 @@ from ..stats.models import MerginInfo from . import test_project, test_workspace_id, test_project_dir, TMP_DIR from .utils import login_as_admin, initialize, cleanup, file_info -from ..sync.files import ChangesSchema +from ..sync.files import ChangesSchema, files_changes_from_upload thisdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(thisdir, os.pardir)) @@ -36,6 +36,7 @@ def flask_app(request): "DOCS_URL", "COLLECT_STATISTICS", "USER_SELF_REGISTRATION", + "V2_PUSH_ENABLED", ] ) register(application) @@ -213,12 +214,13 @@ def diff_project(app): else: # no files uploaded, hence no action needed pass - upload_changes = ChangesSchema(context={"version": i + 2}).load(change) + + file_changes = files_changes_from_upload(change, location_dir=f"v{i + 2}") pv = ProjectVersion( project, i + 2, project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) assert pv.project_size == sum(file.size for file in pv.files) diff --git a/server/mergin/tests/test_config.py b/server/mergin/tests/test_config.py index 8b745a0a..af677cb0 100644 --- a/server/mergin/tests/test_config.py +++ b/server/mergin/tests/test_config.py @@ -21,6 +21,7 @@ def test_config(client): "minor", "user_self_registration", "build_hash", + "v2_push_enabled", } resp = client.get("/config") assert resp.status_code == 200 diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index e7f9e270..044294c5 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -18,7 +18,6 @@ ProjectRole, ProjectUser, ) -from ..sync.files import UploadChanges from ..auth.models import User from ..app import db from . import DEFAULT_USER @@ -40,8 +39,7 @@ def test_close_user_account(client, diff_project): # user has access to mergin user diff_project diff_project.set_role(user.id, ProjectRole.WRITER) # user contributed to another user project so he is listed in projects history - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(diff_project, 11, user.id, changes, "127.0.0.1") + pv = ProjectVersion(diff_project, 11, user.id, [], "127.0.0.1") diff_project.latest_version = pv.name pv.project = diff_project db.session.add(pv) @@ -116,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - changes = UploadChanges(added=[], removed=[], updated=[]) - upload = Upload(diff_project, 10, changes, mergin_user.id) + upload = Upload(diff_project, 10, [], mergin_user.id) db.session.add(upload) project_id = diff_project.id user = add_user("user", "user") diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index b1f60a8f..b615f195 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -7,6 +7,7 @@ from dataclasses import asdict from unittest.mock import patch from urllib.parse import quote +from psycopg2 import IntegrityError import pysqlite3 import pytest import json @@ -35,7 +36,7 @@ PushChangeType, ProjectFilePath, ) -from ..sync.files import ChangesSchema +from ..sync.files import files_changes_from_upload from ..sync.schemas import ProjectListSchema from ..sync.utils import generate_checksum, is_versioned_file from ..auth.models import User, UserProfile @@ -1277,8 +1278,7 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload_changes = ChangesSchema(context={"version": version}).load(changes) - upload = Upload(project, version, upload_changes, user.id) + upload = Upload(project, version, changes, user.id) db.session.add(upload) db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) @@ -1354,9 +1354,8 @@ def upload_chunks(upload_dir, changes, src_dir=test_project_dir): def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) - resp = client.post(url, headers=json_headers) + resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1364,6 +1363,7 @@ def test_push_finish(client): assert failure.error_type == "push_finish" assert "corrupted_files" in failure.error_details + upload, upload_dir = create_transaction("mergin", changes) os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) # mimic chunks were uploaded os.makedirs(os.path.join(upload_dir, "chunks")) @@ -1373,7 +1373,10 @@ def test_push_finish(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp2 = client.post(url, headers={**json_headers, "User-Agent": "Werkzeug"}) + resp2 = client.post( + f"/v1/project/push/finish/{upload.id}", + headers={**json_headers, "User-Agent": "Werkzeug"}, + ) assert resp2.status_code == 200 assert not os.path.exists(upload_dir) version = upload.project.get_latest_version() @@ -2274,12 +2277,12 @@ def add_project_version(project, changes, version=None): else User.query.filter_by(username=DEFAULT_USER[0]).first() ) next_version = version or project.next_version() - upload_changes = ChangesSchema(context={"version": next_version}).load(changes) + file_changes = files_changes_from_upload(changes, location_dir=f"v{next_version}") pv = ProjectVersion( project, next_version, author.id, - upload_changes, + file_changes, ip="127.0.0.1", ) db.session.add(pv) @@ -2293,19 +2296,23 @@ def test_project_version_integrity(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - # manually create an identical project version in db - pv = add_project_version(upload.project, changes) - # try to finish the transaction - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) - assert resp.status_code == 422 - assert "Failed to create new version" in resp.json["detail"] - failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() - assert failure.error_type == "push_finish" - assert "Failed to create new version" in failure.error_details - upload.project.latest_version = pv.name - 1 - db.session.delete(pv) - db.session.delete(failure) - db.session.commit() + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Project version already exists", None, None), + ): + resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + assert resp.status_code == 422 + assert "Failed to create new version" in resp.json["detail"] + failure = SyncFailuresHistory.query.filter_by( + project_id=upload.project.id + ).first() + assert failure.error_type == "push_finish" + assert "Failed to create new version" in failure.error_details + db.session.delete(failure) + db.session.commit() # changes without an upload with patch("mergin.sync.public_api_controller.get_user_agent") as mock: @@ -2320,7 +2327,7 @@ def test_project_version_integrity(client): # to insert an identical project version when no upload (only one endpoint used), # we need to pretend side effect of a function called just before project version insertion def _get_user_agent(): - add_project_version(project, changes) + add_project_version(project, {}) # bypass endpoint checks upload.project.latest_version = ProjectVersion.from_v_name(data["version"]) return "Input" diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 2d88d652..57cb2a2a 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1,13 +1,38 @@ # Copyright (C) Lutra Consulting Limited # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from .utils import add_user -from ..app import db -from mergin.sync.models import Project -from tests import test_project, test_workspace_id +import os +from unittest.mock import patch +from flask import current_app +from psycopg2 import IntegrityError +import pytest +from datetime import datetime, timedelta, timezone -from ..config import Configuration -from ..sync.models import ProjectRole +from mergin.app import db +from mergin.config import Configuration +from mergin.sync.errors import ( + ProjectLocked, + ProjectVersionMismatch, + StorageLimitHit, + UploadError, +) +from mergin.sync.models import ( + Project, + ProjectRole, + ProjectVersion, + SyncFailuresHistory, + Upload, +) +from mergin.sync.utils import get_chunk_location +from . import TMP_DIR, test_project, test_workspace_id, test_project_dir +from .test_project_controller import ( + CHUNK_SIZE, + _get_changes, + _get_changes_with_diff, + _get_changes_with_diff_0_size, + _get_changes_without_added, +) +from .utils import add_user def test_schedule_delete_project(client): @@ -126,3 +151,194 @@ def test_project_members(client): # access provided by workspace role cannot be removed directly response = client.delete(url + f"/{user.id}") assert response.status_code == 404 + + +push_data = [ + # success + ( + {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, + 201, + None, + ), + # with diff, success + ({"version": "v1", "changes": _get_changes_with_diff(test_project_dir)}, 201, None), + # just a dry-run + ( + { + "version": "v1", + "changes": _get_changes_with_diff(test_project_dir), + "check_only": True, + }, + 204, + None, + ), + # broken .gpkg file + ( + {"version": "v1", "changes": _get_changes_with_diff_0_size(test_project_dir)}, + 422, + UploadError.code, + ), + # contains already uploaded file + ( + {"version": "v1", "changes": _get_changes(test_project_dir)}, + 422, + UploadError.code, + ), + # version mismatch + ( + {"version": "v0", "changes": _get_changes_without_added(test_project_dir)}, + 409, + ProjectVersionMismatch.code, + ), + # no changes requested + ( + {"version": "v1", "changes": {"added": [], "removed": [], "updated": []}}, + 422, + UploadError.code, + ), + # inconsistent changes, a file cannot be added and updated at the same time + ( + { + "version": "v1", + "changes": { + "added": [ + { + "path": "test.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + "chunks": [], + } + ], + "removed": [], + "updated": [ + { + "path": "test.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + "chunks": [], + } + ], + }, + }, + 422, + UploadError.code, + ), + # inconsistent changes, a file which does not exist cannot be deleted + ( + { + "version": "v1", + "changes": { + "added": [], + "removed": [ + { + "path": "not-existing.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + } + ], + "updated": [], + }, + }, + 422, + UploadError.code, + ), + # missing version (required parameter) + ({"changes": _get_changes_without_added(test_project_dir)}, 400, None), + # incorrect changes format + ({"version": "v1", "changes": {}}, 400, None), +] + + +@pytest.mark.parametrize("data,expected,err_code", push_data) +def test_create_version(client, data, expected, err_code): + """Test project push endpoint with different payloads.""" + + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + assert project.latest_version == 1 + + if expected == 201: + # mimic chunks were uploaded + for f in data["changes"]["added"] + data["changes"]["updated"]: + src_file = ( + os.path.join(TMP_DIR, f["diff"]["path"]) + if f.get("diff") + else os.path.join(test_project_dir, f["path"]) + ) + with open(src_file, "rb") as in_file: + for chunk in f["chunks"]: + chunk_location = os.path.join( + current_app.config["UPLOAD_CHUNKS_DIR"], + *get_chunk_location(chunk), + ) + os.makedirs(os.path.dirname(chunk_location), exist_ok=True) + with open(chunk_location, "wb") as out_file: + out_file.write(in_file.read(CHUNK_SIZE)) + + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == expected + if expected == 201: + assert response.json["name"] == "v2" + assert project.latest_version == 2 + else: + assert project.latest_version == 1 + if err_code: + assert response.json["code"] == err_code + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + # failures are not created when POST request body is invalid (caught by connexion validators) + if failure: + assert failure.last_version == "v1" + assert failure.error_type == "project_push" + + +def test_create_version_failures(client): + """Test various project push failures beyond invalid payload""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + + data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} + + # somebody else is syncing + upload = Upload(project, 1, _get_changes(test_project_dir), 1) + db.session.add(upload) + db.session.commit() + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() + + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + assert response.json["code"] == ProjectVersionMismatch.code + upload.clear() + + # project is locked + project.locked_until = datetime.now(timezone.utc) + timedelta(days=1) + db.session.commit() + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == ProjectLocked.code + project.locked_until = None + db.session.commit() + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + Configuration, + "GLOBAL_STORAGE", + 0, + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == StorageLimitHit.code + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Cannot insert new version", None, None), + ): + # keep just deleted data to avoid messing with chunks + data["changes"]["added"] = data["changes"]["updated"] = [] + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == UploadError.code diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 94fc033f..6dcfd157 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -4,13 +4,11 @@ import json import shutil -from typing import Tuple import pysqlite3 import uuid import math from dataclasses import asdict from datetime import datetime - import pysqlite3 from flask import url_for, current_app import os @@ -20,7 +18,7 @@ from ..auth.models import User, UserProfile from ..sync.utils import generate_location, generate_checksum from ..sync.models import Project, ProjectVersion, FileHistory, ProjectRole -from ..sync.files import UploadChanges, ChangesSchema +from ..sync.files import ProjectFileChange, PushChangeType, files_changes_from_upload from ..sync.workspace import GlobalWorkspace from ..app import db from . import json_headers, DEFAULT_USER, test_project, test_project_dir, TMP_DIR @@ -82,8 +80,7 @@ def create_project(name, workspace, user, **kwargs): p.updated = datetime.utcnow() db.session.add(p) db.session.flush() - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") db.session.add(pv) db.session.commit() @@ -156,15 +153,17 @@ def initialize(): for f in files: abs_path = os.path.join(root, f) project_files.append( - { - "path": abs_path.replace(test_project_dir, "").lstrip("/"), - "location": os.path.join( + ProjectFileChange( + path=abs_path.replace(test_project_dir, "").lstrip("/"), + checksum=generate_checksum(abs_path), + size=os.path.getsize(abs_path), + mtime=str(datetime.fromtimestamp(os.path.getmtime(abs_path))), + change=PushChangeType.CREATE, + location=os.path.join( "v1", abs_path.replace(test_project_dir, "").lstrip("/") ), - "size": os.path.getsize(abs_path), - "checksum": generate_checksum(abs_path), - "mtime": str(datetime.fromtimestamp(os.path.getmtime(abs_path))), - } + diff=None, + ) ) p.latest_version = 1 p.public = True @@ -173,14 +172,7 @@ def initialize(): db.session.add(p) db.session.commit() - upload_changes = ChangesSchema(context={"version": 1}).load( - { - "added": project_files, - "updated": [], - "removed": [], - } - ) - pv = ProjectVersion(p, 1, user.id, upload_changes, "127.0.0.1") + pv = ProjectVersion(p, 1, user.id, project_files, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -285,7 +277,7 @@ def create_blank_version(project): project, project.next_version(), project.creator.id, - UploadChanges(added=[], updated=[], removed=[]), + [], "127.0.0.1", ) db.session.add(pv) @@ -355,14 +347,14 @@ def push_change(project, action, path, src_dir): else: return - upload_changes = ChangesSchema(context={"version": project.next_version()}).load( - changes + file_changes = files_changes_from_upload( + changes, location_dir=f"v{project.next_version()}" ) pv = ProjectVersion( project, project.next_version(), project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) db.session.add(pv) From e10729a685013c503c9a03ce51cc04bda8ff8bc0 Mon Sep 17 00:00:00 2001 From: Marcel Kocisek Date: Tue, 5 Aug 2025 10:57:13 +0200 Subject: [PATCH 02/13] Added new endpoint for pushing chunks (#487) * Added new endpoint for pushing chunks - new variable for storing chunks - UPLOAD_CHUNKS_DIR, UPLOAD_CHUNKS_EXPIRATION * cleanup * cleanup api yaml --- .gitignore | 1 + deployment/common/set_permissions.sh | 0 development.md | 2 - server/mergin/app.py | 10 +++ server/mergin/sync/config.py | 7 ++ server/mergin/sync/public_api_v2.yaml | 46 ++++++++++++ .../mergin/sync/public_api_v2_controller.py | 41 ++++++++++- server/mergin/sync/schemas.py | 7 ++ server/mergin/sync/utils.py | 14 ++++ server/mergin/tests/test_public_api_v2.py | 71 ++++++++++++++++++- 10 files changed, 194 insertions(+), 5 deletions(-) mode change 100644 => 100755 deployment/common/set_permissions.sh diff --git a/.gitignore b/.gitignore index 7614b44b..5ca81560 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ projects*/ data/ mergin_db +diagnostic_logs logs *.log diff --git a/deployment/common/set_permissions.sh b/deployment/common/set_permissions.sh old mode 100644 new mode 100755 diff --git a/development.md b/development.md index b2a1be08..b9920e89 100644 --- a/development.md +++ b/development.md @@ -71,8 +71,6 @@ cd deployment/community/ # Create .prod.env file from .env.template cp .env.template .prod.env -# Run the docker composition with the current Dockerfiles -cp .env.template .prod.env docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d # Give ownership of the ./projects folder to user that is running the gunicorn container diff --git a/server/mergin/app.py b/server/mergin/app.py index d0fd2f3a..845afd8c 100644 --- a/server/mergin/app.py +++ b/server/mergin/app.py @@ -347,6 +347,16 @@ def ping(): # pylint: disable=W0612 ) return status, 200 + # reading raw input stream not supported in connexion so far + # https://github.com/zalando/connexion/issues/592 + # and as workaround we use custom Flask endpoint in create_app function + @app.route("/v2/projects//chunks", methods=["POST"]) + @auth_required + def upload_chunk_v2(id: str): + from .sync import public_api_v2_controller + + return public_api_v2_controller.upload_chunk(id) + # reading raw input stream not supported in connexion so far # https://github.com/zalando/connexion/issues/592 # and as workaround we use custom Flask endpoint in create_app function diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index b182da6d..ff3875ee 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -64,3 +64,10 @@ class Configuration(object): ) # in seconds, older unfinished zips are moved to temp PARTIAL_ZIP_EXPIRATION = config("PARTIAL_ZIP_EXPIRATION", default=600, cast=int) + UPLOAD_CHUNKS_DIR = config( + "UPLOAD_CHUNKS_DIR", + default=os.path.join(LOCAL_PROJECTS, "chunks"), + ) # directory for file chunks + UPLOAD_CHUNKS_EXPIRATION = config( + "UPLOAD_CHUNKS_EXPIRATION", default=86400, cast=int + ) # time in seconds after chunks are permanently deleted (1 day) diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index 04dbce61..fd6e4353 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -96,6 +96,39 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + # /projects/{id}/chunks: + # post: + # tags: + # - project + # summary: Upload file chunk. + # operationId: upload_chunk + # parameters: + # - $ref: "#/components/parameters/ProjectId" + # requestBody: + # x-stream-upload: true + # content: + # application/octet-stream: + # schema: + # type: string + # format: binary + # responses: + # "200": + # description: Chunk upload response + # content: + # application/json: + # schema: + # $ref: "#/components/schemas/UploadChunk" + # "400": + # $ref: "#/components/responses/BadRequest" + # "401": + # $ref: "#/components/responses/Unauthorized" + # "403": + # $ref: "#/components/responses/Forbidden" + # "404": + # $ref: "#/components/responses/NotFound" + # "413": + # $ref: "#/components/responses/RequestTooBig" + # x-openapi-router-controller: mergin.sync.public_api_v2_controller /projects/{id}/collaborators: parameters: - $ref: "#/components/parameters/ProjectId" @@ -233,6 +266,8 @@ components: description: Not found Conflict: description: Conflict + RequestTooBig: + description: Request Entity Too Large parameters: ProjectId: name: id @@ -268,6 +303,17 @@ components: - $ref: "#/components/schemas/ProjectRole" nullable: false description: combination of workspace role and project role + UploadChunk: + type: object + properties: + id: + type: string + format: uuid + example: "123e4567-e89b-12d3-a456-426614174000" + valid_until: + type: string + format: date-time + example: "2023-10-01T12:00:00Z" ProjectMember: type: object properties: diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 7f40c54b..55ae48d8 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -3,19 +3,25 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from datetime import datetime +import os +import uuid +from datetime import datetime, timedelta, timezone from connexion import NoContent, request -from flask import abort, jsonify +from flask import abort, jsonify, current_app, make_response from flask_login import current_user from mergin.sync.forms import project_name_validation -from .schemas import ProjectMemberSchema +from .schemas import ProjectMemberSchema, UploadChunkSchema from .workspace import WorkspaceRole from ..app import db from ..auth import auth_required from ..auth.models import User from .models import Project, ProjectRole, ProjectMember from .permissions import ProjectPermissions, require_project_by_uuid +from .errors import ProjectLocked +from .utils import get_chunk_location +from .storages.disk import move_to_tmp, save_to_file @auth_required @@ -128,3 +134,34 @@ def remove_project_collaborator(id, user_id): project.unset_role(user_id) db.session.commit() return NoContent, 204 + + +@auth_required +def upload_chunk(id: str): + """ + Push chunk to chunks location. + """ + project = require_project_by_uuid(id, ProjectPermissions.Edit) + if project.locked_until: + abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + # generate uuid for chunk + chunk_id = str(uuid.uuid4()) + dest_file = get_chunk_location(chunk_id) + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest_file, chunk_id) + abort(413, "Chunk size exceeds maximum allowed size") + except Exception as e: + abort(400, "Error saving chunk") + + # Add valid_until timestamp to the response, remove tzinfo for compatibility with DateTimeWithZ + valid_until = ( + datetime.now(timezone.utc) + + timedelta(seconds=current_app.config["UPLOAD_CHUNKS_EXPIRATION"]) + ).replace(tzinfo=None) + return ( + UploadChunkSchema().dump({"id": chunk_id, "valid_until": valid_until}), + 200, + ) diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 75b6f09e..9d9b1309 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -405,3 +405,10 @@ class ProjectMemberSchema(Schema): project_role = fields.Enum(enum=ProjectRole, by_value=True) workspace_role = fields.Enum(enum=WorkspaceRole, by_value=True) role = fields.Enum(enum=ProjectRole, by_value=True) + + +class UploadChunkSchema(Schema): + """Schema for chunk upload response""" + + id = fields.UUID() + valid_until = DateTimeWithZ() diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index c4d5fa16..7babb231 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -578,3 +578,17 @@ def get_x_accel_uri(*url_parts): url = url.lstrip(os.path.sep) result = os.path.join(download_accell_uri, url) return result + + +def get_chunk_location(id: str): + """ + Get file name for chunk + + Splits the given identifier into two parts. + + Returns a tuple where the first element is the first two characters of the identifier, and the second element is the remaining characters. + """ + chunk_dir = current_app.config.get("UPLOAD_CHUNKS_DIR") + small_hash = id[:2] + file_name = id[2:] + return os.path.join(chunk_dir, small_hash, file_name) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 2d88d652..5bef6b7d 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1,13 +1,19 @@ # Copyright (C) Lutra Consulting Limited # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import os +from datetime import datetime, timedelta, timezone + +from mergin.sync.utils import get_chunk_location + from .utils import add_user from ..app import db from mergin.sync.models import Project -from tests import test_project, test_workspace_id +from ..tests import test_project, test_workspace_id from ..config import Configuration from ..sync.models import ProjectRole +from . import test_project_dir def test_schedule_delete_project(client): @@ -126,3 +132,66 @@ def test_project_members(client): # access provided by workspace role cannot be removed directly response = client.delete(url + f"/{user.id}") assert response.status_code == 404 + + +def test_upload_chunk(client, app): + """Test pushing a chunk to a project""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + url = f"/v2/projects/{project.id}/chunks" + app.config["MAX_CHUNK_SIZE"] = 1024 # Set a small max chunk size for testing + max_chunk_size = app.config["MAX_CHUNK_SIZE"] + + response = client.post( + url, + data=b"a" * (max_chunk_size + 1), # Exceeding max chunk size + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 413 + + # Project is locked, cannot push chunks + project.locked_until = datetime.now(timezone.utc) + timedelta(weeks=26) + db.session.commit() + response = client.post( + url, + data=b"a", + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 422 + assert response.json["code"] == "ProjectLocked" + + project.locked_until = None # Unlock the project + project.removed_at = datetime.now(timezone.utc) - timedelta( + days=(client.application.config["DELETED_PROJECT_EXPIRATION"] + 1) + ) # Ensure project is removed + db.session.commit() + response = client.post( + url, + data=b"a", + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 404 + + # Push a chunk successfully + project.removed_at = None # Ensure project is not removed + db.session.commit() + response = client.post( + url, + data=b"a" * max_chunk_size, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + chunk_id = response.json["id"] + assert chunk_id + valid_until = response.json["valid_until"] + valid_until_dt = datetime.strptime(valid_until, "%Y-%m-%dT%H:%M:%S%z") + assert valid_until_dt > datetime.now(timezone.utc) + assert valid_until_dt < datetime.now(timezone.utc) + timedelta( + seconds=app.config["UPLOAD_CHUNKS_EXPIRATION"] + ) + # Check if the chunk is stored correctly + stored_chunk = get_chunk_location(chunk_id) + assert os.path.exists(stored_chunk) + with open(stored_chunk, "rb") as f: + assert f.read() == b"a" * max_chunk_size From 5e8c5a94cf0fc866c30832f865a1c14abfeae4cf Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 5 Aug 2025 14:30:05 +0200 Subject: [PATCH 03/13] Use new errors structure in v2 chunks endpoint --- server/mergin/sync/errors.py | 9 +++++++++ server/mergin/sync/public_api_v2_controller.py | 7 ++++--- server/mergin/tests/test_public_api_v2.py | 14 +++++++++----- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/server/mergin/sync/errors.py b/server/mergin/sync/errors.py index deca4dc3..c5444c3c 100644 --- a/server/mergin/sync/errors.py +++ b/server/mergin/sync/errors.py @@ -3,8 +3,12 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from typing import List, Dict + +from .config import Configuration from ..app import ResponseError +MAX_CHUNK_SIZE = Configuration.MAX_CHUNK_SIZE / 1024 / 1024 + class UpdateProjectAccessError(ResponseError): code = "UpdateProjectAccessError" @@ -81,3 +85,8 @@ def to_dict(self) -> Dict: if self.error is not None: data["detail"] = self.error + f" ({self.code})" return data + + +class BigChunkError(ResponseError): + code = "BigChunkError" + detail = f"Chunk size exceeds maximum allowed size {MAX_CHUNK_SIZE} MB" diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index f41cbb0b..af00cecf 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -18,6 +18,7 @@ from ..auth import auth_required from ..auth.models import User from .errors import ( + BigChunkError, DataSyncError, ProjectLocked, ProjectVersionMismatch, @@ -342,7 +343,7 @@ def upload_chunk(id: str): """ project = require_project_by_uuid(id, ProjectPermissions.Edit) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + return ProjectLocked().response(422) # generate uuid for chunk chunk_id = str(uuid.uuid4()) dest_file = get_chunk_location(chunk_id) @@ -351,9 +352,9 @@ def upload_chunk(id: str): save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) except IOError: move_to_tmp(dest_file, chunk_id) - abort(413, "Chunk size exceeds maximum allowed size") + return BigChunkError().response(413) except Exception as e: - abort(400, "Error saving chunk") + return UploadError(error="Error saving chunk").response(400) # Add valid_until timestamp to the response, remove tzinfo for compatibility with DateTimeWithZ valid_until = ( diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 1d88422a..acc275c6 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -11,6 +11,7 @@ from mergin.app import db from mergin.config import Configuration from mergin.sync.errors import ( + BigChunkError, ProjectLocked, ProjectVersionMismatch, StorageLimitHit, @@ -341,14 +342,16 @@ def test_create_version_failures(client): assert response.json["code"] == UploadError.code -def test_upload_chunk(client, app): +def test_upload_chunk(client): """Test pushing a chunk to a project""" project = Project.query.filter_by( workspace_id=test_workspace_id, name=test_project ).first() url = f"/v2/projects/{project.id}/chunks" - app.config["MAX_CHUNK_SIZE"] = 1024 # Set a small max chunk size for testing - max_chunk_size = app.config["MAX_CHUNK_SIZE"] + client.application.config["MAX_CHUNK_SIZE"] = ( + 1024 # Set a small max chunk size for testing + ) + max_chunk_size = client.application.config["MAX_CHUNK_SIZE"] response = client.post( url, @@ -356,6 +359,7 @@ def test_upload_chunk(client, app): headers={"Content-Type": "application/octet-stream"}, ) assert response.status_code == 413 + assert response.json["code"] == BigChunkError.code # Project is locked, cannot push chunks project.locked_until = datetime.now(timezone.utc) + timedelta(weeks=26) @@ -366,7 +370,7 @@ def test_upload_chunk(client, app): headers={"Content-Type": "application/octet-stream"}, ) assert response.status_code == 422 - assert response.json["code"] == "ProjectLocked" + assert response.json["code"] == ProjectLocked.code project.locked_until = None # Unlock the project project.removed_at = datetime.now(timezone.utc) - timedelta( @@ -395,7 +399,7 @@ def test_upload_chunk(client, app): valid_until_dt = datetime.strptime(valid_until, "%Y-%m-%dT%H:%M:%S%z") assert valid_until_dt > datetime.now(timezone.utc) assert valid_until_dt < datetime.now(timezone.utc) + timedelta( - seconds=app.config["UPLOAD_CHUNKS_EXPIRATION"] + seconds=client.application.config["UPLOAD_CHUNKS_EXPIRATION"] ) # Check if the chunk is stored correctly stored_chunk = get_chunk_location(chunk_id) From fb378ad65b217bb9e93d64ca9af1fd305ce61606 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 5 Aug 2025 15:06:54 +0200 Subject: [PATCH 04/13] Add integration test for full v2 push --- .../mergin/sync/public_api_v2_controller.py | 2 +- server/mergin/tests/test_public_api_v2.py | 53 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index af00cecf..c4f8ed31 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -287,7 +287,7 @@ def create_project_version(id): db.session.add(project) db.session.commit() # let's move uploaded files where they are expected to be - temp_files_dir = os.path.join(upload.upload_dir, "files") + temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) os.renames(temp_files_dir, version_dir) logging.info( diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index acc275c6..75794eff 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import os +import shutil from unittest.mock import patch from flask import current_app from psycopg2 import IntegrityError @@ -33,7 +34,7 @@ _get_changes_with_diff_0_size, _get_changes_without_added, ) -from .utils import add_user +from .utils import add_user, file_info def test_schedule_delete_project(client): @@ -406,3 +407,53 @@ def test_upload_chunk(client): assert os.path.exists(stored_chunk) with open(stored_chunk, "rb") as f: assert f.read() == b"a" * max_chunk_size + + +def test_full_push(client): + """Test full project push with upload of chunks and project version creation""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + + # prepare data to push + project_dir = os.path.join(TMP_DIR, test_project) + if os.path.exists(project_dir): + shutil.rmtree(project_dir) + shutil.copytree(test_project_dir, project_dir) + os.rename( + os.path.join(project_dir, "base.gpkg"), + os.path.join(project_dir, "new_base.gpkg"), + ) + + test_file = file_info(project_dir, "new_base.gpkg", chunk_size=CHUNK_SIZE) + uploaded_chunks = [] + + with open(os.path.join(project_dir, test_file["path"]), "rb") as in_file: + for _ in test_file["chunks"]: + data = in_file.read(CHUNK_SIZE) + response = client.post( + f"/v2/projects/{project.id}/chunks", + data=data, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + uploaded_chunks.append(response.json["id"]) + chunk_location = get_chunk_location(response.json["id"]) + assert os.path.exists(chunk_location) + + test_file["chunks"] = uploaded_chunks + + response = client.post( + f"v2/projects/{project.id}/versions", + json={ + "version": "v1", + "changes": {"added": [test_file], "updated": [], "removed": []}, + }, + ) + assert response.status_code == 201 + assert response.json["name"] == "v2" + assert project.latest_version == 2 + assert os.path.exists( + os.path.join(project.storage.project_dir, "v2", test_file["path"]) + ) + assert not Upload.query.filter_by(project_id=project.id).first() From b8afe5fdd0abf29e7e5bf29252859846051bece4 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 6 Aug 2025 10:20:17 +0200 Subject: [PATCH 05/13] Address comments --- server/mergin/sync/errors.py | 9 +++-- server/mergin/sync/files.py | 2 +- server/mergin/sync/public_api_v2.yaml | 30 ++++++++++++++--- .../mergin/sync/public_api_v2_controller.py | 33 ++++++++++--------- server/mergin/sync/storages/disk.py | 26 +++++++-------- server/mergin/tests/test_public_api_v2.py | 12 +++---- 6 files changed, 70 insertions(+), 42 deletions(-) diff --git a/server/mergin/sync/errors.py b/server/mergin/sync/errors.py index c5444c3c..35985ab9 100644 --- a/server/mergin/sync/errors.py +++ b/server/mergin/sync/errors.py @@ -58,8 +58,8 @@ def to_dict(self) -> Dict: return data -class ProjectVersionMismatch(ResponseError): - code = "ProjectVersionMismatch" +class ProjectVersionExists(ResponseError): + code = "ProjectVersionExists" detail = "Project version mismatch" def __init__(self, client_version: int, server_version: int): @@ -73,6 +73,11 @@ def to_dict(self) -> Dict: return data +class AnotherUploadRunning(ResponseError): + code = "AnotherUploadRunning" + detail = "Another process is running" + + class UploadError(ResponseError): code = "UploadError" detail = "Project version could not be created" diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 5c8c1427..fd77c597 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -204,7 +204,7 @@ def validate(self, data, **kwargs): if len(set(changes_files)) != len(changes_files): raise ValidationError("Not unique changes") - # check if all .gpkg file are valid + # check if all files are valid for file in data["added"] + data["updated"]: file_path = file["path"] if is_versioned_file(file_path) and file["size"] == 0: diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index a71020a7..4a4f7904 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -320,7 +320,9 @@ paths: content: application/problem+json: schema: - $ref: "#/components/schemas/ProjectVersionMismatch" + anyOf: + - $ref: "#/components/schemas/ProjectVersionExists" + - $ref: "#/components/schemas/AnotherUploadRunning" "422": description: Request could not be processed by server content: @@ -330,8 +332,14 @@ paths: - $ref: "#/components/schemas/UploadError" - $ref: "#/components/schemas/TrialExpired" - $ref: "#/components/schemas/StorageLimitHit" - - $ref: "#/components/schemas/ProjectLocked" - $ref: "#/components/schemas/DataSyncError" + "423": + description: Project is locked for any upload + content: + application/problem+json: + schema: + $ref: "#/components/schemas/ProjectLocked" + x-openapi-router-controller: mergin.sync.public_api_v2_controller components: responses: @@ -397,7 +405,19 @@ components: example: code: ProjectLocked detail: The project is currently locked and you cannot make changes to it (ProjectLocked) - ProjectVersionMismatch: + ProjectVersionExists: + allOf: + - $ref: '#/components/schemas/CustomError' + type: object + properties: + client_version: + type: string + server_version: + type: string + example: + code: ProjectVersionExists + detail: Project version mismatch (ProjectVersionExists) + AnotherUploadRunning: allOf: - $ref: '#/components/schemas/CustomError' type: object @@ -407,8 +427,8 @@ components: server_version: type: string example: - code: ProjectVersionMismatch - detail: Project version mismatch (ProjectVersionMismatch) + code: AnotherUploadRunning + detail: Another process is running (AnotherUploadRunning) DataSyncError: allOf: - $ref: '#/components/schemas/CustomError' diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index c4f8ed31..ec3fd260 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -9,7 +9,7 @@ import psycopg2 from connexion import NoContent, request from datetime import datetime, timedelta, timezone -from flask import abort, jsonify, current_app, make_response +from flask import abort, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError from psycopg2 import IntegrityError @@ -18,10 +18,11 @@ from ..auth import auth_required from ..auth.models import User from .errors import ( + AnotherUploadRunning, BigChunkError, DataSyncError, ProjectLocked, - ProjectVersionMismatch, + ProjectVersionExists, StorageLimitHit, UploadError, ) @@ -170,7 +171,7 @@ def create_project_version(id): request.view_args["project"] = project if project.locked_until: - return ProjectLocked().response(422) + return ProjectLocked().response(423) next_version = project.next_version() v_next_version = ProjectVersion.to_v_name(next_version) @@ -178,12 +179,12 @@ def create_project_version(id): pv = project.get_latest_version() if pv and pv.name != version: - return ProjectVersionMismatch(version, pv.name).response(409) + return ProjectVersionExists(version, pv.name).response(409) # reject push if there is another one already running pending_upload = Upload.query.filter_by(project_id=project.id).first() if pending_upload and pending_upload.is_active(): - return ProjectVersionMismatch(version, next_version).response(409) + return AnotherUploadRunning().response(409) try: ChangesSchema().validate(changes) @@ -192,16 +193,20 @@ def create_project_version(id): msg = err.messages[0] if type(err.messages) == list else "Invalid input data" return UploadError(error=msg).response(422) + to_be_added_files = upload_changes["added"] + to_be_updated_files = upload_changes["updated"] + to_be_removed_files = upload_changes["removed"] + # check consistency of changes current_files = set(file.path for file in project.files) - added_files = set(file["path"] for file in upload_changes["added"]) + added_files = set(file["path"] for file in to_be_added_files) if added_files and added_files.issubset(current_files): return UploadError( error=f"Add changes contain files which already exist" ).response(422) modified_files = set( - file["path"] for file in upload_changes["updated"] + upload_changes["removed"] + file["path"] for file in to_be_updated_files + to_be_removed_files ) if modified_files and not modified_files.issubset(current_files): return UploadError( @@ -211,16 +216,14 @@ def create_project_version(id): # Check user data limit updated_files = list( filter( - lambda i: i.path in [f["path"] for f in upload_changes["updated"]], + lambda i: i.path in [f["path"] for f in to_be_updated_files], project.files, ) ) additional_disk_usage = ( - sum( - file["size"] for file in upload_changes["added"] + upload_changes["updated"] - ) + sum(file["size"] for file in to_be_added_files + to_be_updated_files) - sum(file.size for file in updated_files) - - sum(file["size"] for file in upload_changes["removed"]) + - sum(file["size"] for file in to_be_removed_files) ) current_usage = project.workspace.disk_usage() @@ -243,7 +246,7 @@ def create_project_version(id): # check and clean dangling blocking uploads or abort for current_upload in project.uploads.all(): if current_upload.is_active(): - return ProjectVersionMismatch(version, next_version).response(409) + return AnotherUploadRunning().response(409) db.session.delete(current_upload) db.session.commit() # previous push attempt is definitely lost @@ -261,7 +264,7 @@ def create_project_version(id): move_to_tmp(upload.upload_dir) except IntegrityError as err: logging.error(f"Failed to create upload session: {str(err)}") - return ProjectVersionMismatch(version, next_version).response(409) + return AnotherUploadRunning().response(409) # Create transaction folder and lockfile os.makedirs(upload.upload_dir) @@ -343,7 +346,7 @@ def upload_chunk(id: str): """ project = require_project_by_uuid(id, ProjectPermissions.Edit) if project.locked_until: - return ProjectLocked().response(422) + return ProjectLocked().response(423) # generate uuid for chunk chunk_id = str(uuid.uuid4()) dest_file = get_chunk_location(chunk_id) diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index b06e51af..4491ad98 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -245,7 +245,7 @@ def _generator(): return _generator() def apply_diff( - self, current_file: ProjectFile, changeset: str, patchedfile: str + self, current_file: ProjectFile, diff_file: str, patched_file: str ) -> Result: """Apply geodiff diff file on current gpkg basefile. Creates GeodiffActionHistory record of the action. Returns checksum and size of generated file. If action fails it returns geodiff error message. @@ -256,9 +256,9 @@ def apply_diff( basefile = os.path.join(self.project_dir, current_file.location) # create local copy of basefile which will be updated in next version and changeset needed # TODO this can potentially fail for large files - logging.info(f"Apply changes: copying {basefile} to {patchedfile}") + logging.info(f"Apply changes: copying {basefile} to {patched_file}") start = time.time() - with self.geodiff_copy(changeset) as changeset_tmp, self.geodiff_copy( + with self.geodiff_copy(diff_file) as changeset_tmp, self.geodiff_copy( basefile ) as patchedfile_tmp: copy_time = time.time() - start @@ -267,7 +267,7 @@ def apply_diff( # clean geodiff logger self.flush_geodiff_logger() logging.info( - f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)} with changes to {patchedfile}" + f"Geodiff: apply changeset {diff_file} of size {os.path.getsize(diff_file)} with changes to {patched_file}" ) start = time.time() self.geodiff.apply_changeset(patchedfile_tmp, changeset_tmp) @@ -281,7 +281,7 @@ def apply_diff( current_file.size, v_name, "apply_changes", - changeset, + diff_file, ) gh.copy_time = copy_time gh.geodiff_time = geodiff_apply_time @@ -289,11 +289,11 @@ def apply_diff( # move constructed file where is belongs logging.info(f"Apply changes: moving patchfile {patchedfile_tmp}") start = time.time() - copy_file(patchedfile_tmp, patchedfile) + copy_file(patchedfile_tmp, patched_file) gh.copy_time = copy_time + (time.time() - start) # TODO this can potentially fail for large files - logging.info(f"Apply changes: calculating checksum of {patchedfile}") + logging.info(f"Apply changes: calculating checksum of {patched_file}") start = time.time() checksum = generate_checksum(patchedfile_tmp) checksumming_time = time.time() - start @@ -307,20 +307,20 @@ def apply_diff( ) ) except (GeoDiffLibError, GeoDiffLibConflictError): - move_to_tmp(changeset) + move_to_tmp(diff_file) return Err(self.gediff_log.getvalue()) def construct_diff( self, current_file: ProjectFile, - changeset: str, + diff_file: str, uploaded_file: str, ) -> Result: """Construct geodiff diff file from uploaded gpkg and current basefile. Returns diff metadata as a result. If action fails it returns geodiff error message. """ basefile = os.path.join(self.project_dir, current_file.location) - diff_name = os.path.basename(changeset) + diff_name = os.path.basename(diff_file) with self.geodiff_copy(basefile) as basefile_tmp, self.geodiff_copy( uploaded_file ) as uploaded_file_tmp: @@ -332,12 +332,12 @@ def construct_diff( ) self.flush_geodiff_logger() logging.info( - f"Geodiff: create changeset {changeset} from {uploaded_file}" + f"Geodiff: create changeset {diff_file} from {uploaded_file}" ) self.geodiff.create_changeset( basefile_tmp, uploaded_file_tmp, changeset_tmp ) - copy_file(changeset_tmp, changeset) + copy_file(changeset_tmp, diff_file) return Ok( ( generate_checksum(changeset_tmp), @@ -346,7 +346,7 @@ def construct_diff( ) except (GeoDiffLibError, GeoDiffLibConflictError) as e: # diff is not possible to create - file will be overwritten - move_to_tmp(changeset) + move_to_tmp(diff_file) return Err(self.gediff_log.getvalue()) finally: move_to_tmp(changeset_tmp) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 75794eff..68df0949 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -4,7 +4,6 @@ import os import shutil from unittest.mock import patch -from flask import current_app from psycopg2 import IntegrityError import pytest from datetime import datetime, timedelta, timezone @@ -14,7 +13,8 @@ from mergin.sync.errors import ( BigChunkError, ProjectLocked, - ProjectVersionMismatch, + ProjectVersionExists, + AnotherUploadRunning, StorageLimitHit, UploadError, ) @@ -190,7 +190,7 @@ def test_project_members(client): ( {"version": "v0", "changes": _get_changes_without_added(test_project_dir)}, 409, - ProjectVersionMismatch.code, + ProjectVersionExists.code, ), # no changes requested ( @@ -308,14 +308,14 @@ def test_create_version_failures(client): response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 - assert response.json["code"] == ProjectVersionMismatch.code + assert response.json["code"] == AnotherUploadRunning.code upload.clear() # project is locked project.locked_until = datetime.now(timezone.utc) + timedelta(days=1) db.session.commit() response = client.post(f"v2/projects/{project.id}/versions", json=data) - assert response.status_code == 422 + assert response.status_code == 423 assert response.json["code"] == ProjectLocked.code project.locked_until = None db.session.commit() @@ -370,7 +370,7 @@ def test_upload_chunk(client): data=b"a", headers={"Content-Type": "application/octet-stream"}, ) - assert response.status_code == 422 + assert response.status_code == 423 assert response.json["code"] == ProjectLocked.code project.locked_until = None # Unlock the project From 27647c8887cead3a41f5220f762afe1d344c255c Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 6 Aug 2025 14:45:50 +0200 Subject: [PATCH 06/13] Cron job to remove outdated uploaded chunks (#489) * Cron job to remove outdated uploaded chunks * Run cleanup more frequently --- server/application.py | 6 ++++ server/mergin/sync/tasks.py | 30 +++++++++------- server/mergin/sync/utils.py | 19 +++++++++++ server/mergin/tests/test_celery.py | 55 +++++++++++++++++++++++++++--- 4 files changed, 94 insertions(+), 16 deletions(-) diff --git a/server/application.py b/server/application.py index b1ab79ac..fc3dc195 100644 --- a/server/application.py +++ b/server/application.py @@ -27,6 +27,7 @@ remove_projects_archives, remove_temp_files, remove_projects_backups, + remove_unused_chunks, ) from mergin.celery import celery, configure_celery from mergin.stats.config import Configuration @@ -85,4 +86,9 @@ def setup_periodic_tasks(sender, **kwargs): crontab(hour=3, minute=0), remove_projects_archives, name="remove old project archives", + ), + sender.add_periodic_task( + crontab(hour="*/4", minute=0), + remove_unused_chunks, + name="clean up of outdated chunks", ) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index f56fb273..310fefbb 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -6,13 +6,14 @@ import shutil import os import time -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app from .models import Project, ProjectVersion, FileHistory from .storages.disk import move_to_tmp from .config import Configuration +from .utils import remove_outdated_files from ..celery import celery from ..app import db @@ -144,14 +145,19 @@ def create_project_version_zip(version_id: int): @celery.task def remove_projects_archives(): """Remove created zip files for project versions if they were not accessed for certain time""" - for file in os.listdir(current_app.config["PROJECTS_ARCHIVES_DIR"]): - path = os.path.join(current_app.config["PROJECTS_ARCHIVES_DIR"], file) - if datetime.fromtimestamp( - os.path.getatime(path), tz=timezone.utc - ) < datetime.now(timezone.utc) - timedelta( - days=current_app.config["PROJECTS_ARCHIVES_EXPIRATION"] - ): - try: - os.remove(path) - except OSError as e: - logging.error(f"Unable to remove {path}: {str(e)}") + remove_outdated_files( + Configuration.PROJECTS_ARCHIVES_DIR, + timedelta(days=Configuration.PROJECTS_ARCHIVES_EXPIRATION), + ) + + +@celery.task +def remove_unused_chunks(): + """Remove old chunks in shared directory. These are basically just residual from failed uploads.""" + small_hash_dirs = os.listdir(Configuration.UPLOAD_CHUNKS_DIR) + time_delta = timedelta(seconds=Configuration.UPLOAD_CHUNKS_EXPIRATION) + for _dir in small_hash_dirs: + dir = os.path.join(Configuration.UPLOAD_CHUNKS_DIR, _dir) + if not os.path.isdir(dir): + continue + remove_outdated_files(dir, time_delta) diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 7babb231..acd93b26 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -2,11 +2,13 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import logging import math import os import hashlib import re import secrets +from datetime import datetime, timedelta, timezone from threading import Timer from uuid import UUID from shapely import wkb @@ -592,3 +594,20 @@ def get_chunk_location(id: str): small_hash = id[:2] file_name = id[2:] return os.path.join(chunk_dir, small_hash, file_name) + + +def remove_outdated_files(dir: str, time_delta: timedelta): + """Remove all files within directory where last access time passed expiration date""" + for file in os.listdir(dir): + path = os.path.join(dir, file) + if not os.path.isfile(path): + continue + + if ( + datetime.fromtimestamp(os.path.getatime(path), tz=timezone.utc) + < datetime.now(timezone.utc) - time_delta + ): + try: + os.remove(path) + except OSError as e: + logging.error(f"Unable to remove {path}: {str(e)}") diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index a5d07f47..b236d4ba 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -2,8 +2,10 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import math import os -from datetime import datetime, timedelta +import uuid +from datetime import datetime, timedelta, timezone from flask import current_app from flask_mail import Mail from unittest.mock import patch @@ -12,17 +14,32 @@ from ..config import Configuration from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion from ..celery import send_email_async +from ..sync.config import Configuration as SyncConfiguration from ..sync.tasks import ( remove_temp_files, remove_projects_backups, create_project_version_zip, remove_projects_archives, + remove_unused_chunks, ) from ..sync.storages.disk import move_to_tmp -from . import test_project, test_workspace_name, test_workspace_id -from .utils import add_user, create_workspace, create_project, login, modify_file_times +from ..sync.utils import get_chunk_location +from . import ( + test_project, + test_workspace_name, + test_workspace_id, + test_project_dir, + json_headers, +) +from .utils import ( + CHUNK_SIZE, + add_user, + create_workspace, + create_project, + login, + modify_file_times, +) from ..auth.models import User -from . import json_headers def test_send_email(app): @@ -157,3 +174,33 @@ def test_create_project_version_zip(diff_project): modify_file_times(latest_version.zip_path, new_time) remove_projects_archives() assert not os.path.exists(latest_version.zip_path) + + +def test_remove_chunks(app): + """Test cleanup of outdated chunks""" + # pretend chunks were uploaded + chunks = [] + src_file = os.path.join(test_project_dir, "base.gpkg") + with open(src_file, "rb") as in_file: + f_size = os.path.getsize(src_file) + for i in range(math.ceil(f_size / CHUNK_SIZE)): + chunk_id = str(uuid.uuid4()) + chunk_location = get_chunk_location(chunk_id) + os.makedirs(os.path.dirname(chunk_location), exist_ok=True) + with open(chunk_location, "wb") as out_file: + out_file.write(in_file.read(CHUNK_SIZE)) + chunks.append(chunk_location) + + remove_unused_chunks() + assert all(os.path.exists(chunk) for chunk in chunks) + + def _atime_mock(path: str) -> float: + """Mock file stats to be already expired""" + return ( + datetime.now(timezone.utc) + - timedelta(seconds=SyncConfiguration.UPLOAD_CHUNKS_EXPIRATION) + ).timestamp() - 1 + + with patch("os.path.getatime", _atime_mock): + remove_unused_chunks() + assert not any(os.path.exists(chunk) for chunk in chunks) From bdb3da51ed2304652958bdb24fb3c7bfa558d13b Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 13 Aug 2025 09:22:34 +0200 Subject: [PATCH 07/13] Fix create version with only removed files --- server/mergin/sync/public_api_v2_controller.py | 6 ++++-- server/mergin/tests/test_public_api_v2.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index ec3fd260..e503edad 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -289,9 +289,11 @@ def create_project_version(id): db.session.add(pv) db.session.add(project) db.session.commit() + # let's move uploaded files where they are expected to be - temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) - os.renames(temp_files_dir, version_dir) + if to_be_added_files or to_be_updated_files: + temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) + os.renames(temp_files_dir, version_dir) logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 68df0949..1d5e0d7c 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -174,6 +174,21 @@ def test_project_members(client): 204, None, ), + # only delete files + ( + { + "version": "v1", + "changes": { + "added": [], + "removed": [ + file_info(test_project_dir, "base.gpkg"), + ], + "updated": [], + }, + }, + 201, + None, + ), # broken .gpkg file ( {"version": "v1", "changes": _get_changes_with_diff_0_size(test_project_dir)}, From ec80700a8db8c2c22dca626af2828001eca7ba9f Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Wed, 13 Aug 2025 17:32:12 +0200 Subject: [PATCH 08/13] resolveunhandled description --- server/mergin/sync/public_api_controller.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index ab0908d1..3d6793e3 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -728,12 +728,19 @@ def wrapper(*args, **kwargs): == "/v2.mergin_sync_public_api_v2_controller_create_project_version" ): error_type = "project_push" + description = "" + if isinstance(e, IntegrityError): + description = "Database integrity error" + else: + description = ( + e.description + if e.description + else e.response.json.get("detail", "") + ) - if not e.description: # custom error cases (e.g. StorageLimitHit) - e.description = e.response.json["detail"] if project: project.sync_failed( - user_agent, error_type, str(e.description), current_user.id + user_agent, error_type, str(description), current_user.id ) else: logging.warning("Missing project info in sync failure") From 548ec3b38de36f650eca9478dad2db574db4e3ec Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 15 Aug 2025 10:32:19 +0200 Subject: [PATCH 09/13] Remove upload chunks only if push was successful --- server/mergin/sync/models.py | 1 - server/mergin/sync/public_api_controller.py | 9 +++++++++ server/mergin/sync/public_api_v2_controller.py | 8 ++++++++ server/mergin/tests/test_project_controller.py | 3 +++ server/mergin/tests/test_public_api_v2.py | 4 ++++ 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 07219149..98241d35 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -1092,7 +1092,6 @@ def process_chunks( dest.write(data) data = src.read(8192) - move_to_tmp(chunk_file) except IOError: logging.exception( f"Failed to process chunk: {chunk_id} in project {project_path}" diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index ab0908d1..c0ec9fb0 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -1032,6 +1032,15 @@ def push_finish(transaction_id): # let's move uploaded files where they are expected to be os.renames(files_dir, version_dir) + + # remove used chunks + for file in upload.changes["added"] + upload.changes["updated"]: + file_chunks = file.get("chunks", []) + for chunk_id in file_chunks: + chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id) + if os.path.exists(chunk_file): + move_to_tmp(chunk_file) + logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index e503edad..7fe2bbfa 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -295,6 +295,14 @@ def create_project_version(id): temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) os.renames(temp_files_dir, version_dir) + # remove used chunks + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + for chunk_id in file_chunks: + chunk_file = get_chunk_location(chunk_id) + if os.path.exists(chunk_file): + move_to_tmp(chunk_file) + logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." ) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index b615f195..1cba91cc 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1366,12 +1366,14 @@ def test_push_finish(client): upload, upload_dir = create_transaction("mergin", changes) os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) # mimic chunks were uploaded + chunks = [] os.makedirs(os.path.join(upload_dir, "chunks")) for f in upload.changes["added"] + upload.changes["updated"]: with open(os.path.join(test_project_dir, f["path"]), "rb") as in_file: for chunk in f["chunks"]: with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) + chunks.append(chunk) resp2 = client.post( f"/v1/project/push/finish/{upload.id}", @@ -1382,6 +1384,7 @@ def test_push_finish(client): version = upload.project.get_latest_version() assert version.user_agent assert version.device_id == json_headers["X-Device-Id"] + assert all(not os.path.exists(chunk) for chunk in chunks) # tests basic failures resp3 = client.post("/v1/project/push/finish/not-existing") diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 1d5e0d7c..b1fa74da 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -275,6 +275,7 @@ def test_create_version(client, data, expected, err_code): ).first() assert project.latest_version == 1 + chunks = [] if expected == 201: # mimic chunks were uploaded for f in data["changes"]["added"] + data["changes"]["updated"]: @@ -290,11 +291,14 @@ def test_create_version(client, data, expected, err_code): with open(chunk_location, "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) + chunks.append(chunk_location) + response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == expected if expected == 201: assert response.json["name"] == "v2" assert project.latest_version == 2 + assert all(not os.path.exists(chunk) for chunk in chunks) else: assert project.latest_version == 1 if err_code: From ea4d51d558246adcd269344c9f4e2a22ca27f5e3 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 15 Aug 2025 13:47:50 +0200 Subject: [PATCH 10/13] Fix integrity error handling in push --- server/mergin/sync/public_api_controller.py | 20 ++++++--------- .../mergin/sync/public_api_v2_controller.py | 13 +++++----- server/mergin/tests/test_public_api_v2.py | 25 +++++++++++++++++-- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 3d6793e3..8eaf59b3 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -32,7 +32,7 @@ from gevent import sleep import base64 -from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import HTTPException, Conflict from mergin.sync.forms import project_name_validation from .interfaces import WorkspaceRole @@ -707,7 +707,9 @@ def wrapper(*args, **kwargs): if status_code >= 400: raise HTTPException(response=response) return response, status_code - except (HTTPException, IntegrityError) as e: + except IntegrityError: + raise Conflict("Database integrity error") + except HTTPException as e: if e.code in [401, 403, 404]: raise # nothing to do, just propagate downstream @@ -728,15 +730,10 @@ def wrapper(*args, **kwargs): == "/v2.mergin_sync_public_api_v2_controller_create_project_version" ): error_type = "project_push" - description = "" - if isinstance(e, IntegrityError): - description = "Database integrity error" - else: - description = ( - e.description - if e.description - else e.response.json.get("detail", "") - ) + + description = ( + e.description if e.description else e.response.json.get("detail", "") + ) if project: project.sync_failed( @@ -744,7 +741,6 @@ def wrapper(*args, **kwargs): ) else: logging.warning("Missing project info in sync failure") - raise return wrapper diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index e503edad..8a83f808 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -12,7 +12,7 @@ from flask import abort, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError -from psycopg2 import IntegrityError +from sqlalchemy.exc import IntegrityError from ..app import db from ..auth import auth_required @@ -235,10 +235,10 @@ def create_project_version(id): if request.json.get("check_only", False): return NoContent, 204 - # while processing data, block other uploads - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) try: + # while processing data, block other uploads + upload = Upload(project, version, upload_changes, current_user.id) + db.session.add(upload) # Creating blocking upload can fail, e.g. in case of racing condition db.session.commit() except IntegrityError: @@ -257,9 +257,10 @@ def create_project_version(id): current_user.id, ) - # Try again after cleanup - db.session.add(upload) try: + # Try again after cleanup + upload = Upload(project, version, upload_changes, current_user.id) + db.session.add(upload) db.session.commit() move_to_tmp(upload.upload_dir) except IntegrityError as err: diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 1d5e0d7c..51f465c8 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -4,7 +4,7 @@ import os import shutil from unittest.mock import patch -from psycopg2 import IntegrityError +from sqlalchemy.exc import IntegrityError import pytest from datetime import datetime, timedelta, timezone @@ -18,6 +18,7 @@ StorageLimitHit, UploadError, ) +from mergin.sync.files import ChangesSchema from mergin.sync.models import ( Project, ProjectRole, @@ -335,7 +336,7 @@ def test_create_version_failures(client): project.locked_until = None db.session.commit() - # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + # try to finish the transaction which would fail on storage limit with patch.object( Configuration, "GLOBAL_STORAGE", @@ -357,6 +358,26 @@ def test_create_version_failures(client): assert response.status_code == 422 assert response.json["code"] == UploadError.code + # try to finish the transaction which would fail on existing Upload integrity error, e.g. race conditions + with patch.object( + Upload, + "__init__", + side_effect=IntegrityError("Cannot insert upload", None, None), + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + assert response.json["code"] == AnotherUploadRunning.code + + # try to finish the transaction which would fail on unexpected integrity error + # patch of ChangesSchema is just a workaround to trigger and error + with patch.object( + ChangesSchema, + "validate", + side_effect=IntegrityError("Cannot insert upload", None, None), + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + def test_upload_chunk(client): """Test pushing a chunk to a project""" From 84ba9ddd4d3cc2be2132f1eaae1d28299241bea3 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Wed, 3 Sep 2025 15:52:49 +0200 Subject: [PATCH 11/13] Return whole project nfo dump from project versions :( --- server/mergin/sync/public_api_v2.yaml | 235 ++++++++++++++++-- .../mergin/sync/public_api_v2_controller.py | 19 +- server/mergin/tests/test_public_api_v2.py | 4 +- 3 files changed, 223 insertions(+), 35 deletions(-) diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index 4a4f7904..9ed062d5 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -304,7 +304,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/ProjectVersion" + $ref: "#/components/schemas/Project" "204": $ref: "#/components/responses/NoContent" "400": @@ -334,11 +334,11 @@ paths: - $ref: "#/components/schemas/StorageLimitHit" - $ref: "#/components/schemas/DataSyncError" "423": - description: Project is locked for any upload + description: Project is locked for any upload content: - application/problem+json: - schema: - $ref: "#/components/schemas/ProjectLocked" + application/problem+json: + schema: + $ref: "#/components/schemas/ProjectLocked" x-openapi-router-controller: mergin.sync.public_api_v2_controller components: @@ -381,13 +381,13 @@ components: - detail TrialExpired: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" example: code: TrialExpired detail: Failed to push changes. Ask the workspace owner to log in to their Mergin Maps dashboard StorageLimitHit: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" type: object properties: current_usage: @@ -401,13 +401,13 @@ components: storage_limit: 24865 ProjectLocked: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" example: code: ProjectLocked detail: The project is currently locked and you cannot make changes to it (ProjectLocked) ProjectVersionExists: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" type: object properties: client_version: @@ -419,7 +419,7 @@ components: detail: Project version mismatch (ProjectVersionExists) AnotherUploadRunning: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" type: object properties: client_version: @@ -431,7 +431,7 @@ components: detail: Another process is running (AnotherUploadRunning) DataSyncError: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" type: object properties: failed_files: @@ -443,7 +443,7 @@ components: "survey.gpkg": "Corrupted file" UploadError: allOf: - - $ref: '#/components/schemas/CustomError' + - $ref: "#/components/schemas/CustomError" example: code: UploadError detail: "Project version could not be created (UploadError)" @@ -541,23 +541,216 @@ components: nullable: true allOf: - $ref: "#/components/schemas/File" - ProjectVersion: + MerginTag: + type: string + enum: + - valid_qgis + - mappin_use + - input_use + example: valid_qgis + Access: + type: object + properties: + ownersnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + writersnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + editorsnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + readersnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + public: + type: boolean + example: true + owners: + type: array + nullable: false + items: + type: integer + example: [1] + writers: + type: array + nullable: false + items: + type: integer + example: [1] + editors: + type: array + nullable: false + items: + type: integer + example: [1] + readers: + type: array + nullable: false + items: + type: integer + example: [1] + FileInfo: type: object + required: + - path + - size + - checksum properties: + path: + type: string + example: media/favicon.ico + checksum: + description: sha1 hash + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + size: + type: integer + format: int64 + example: 1024 + mtime: + deprecated: true + type: string + format: date-time + example: 2018-11-30T08:47:58.636074Z + diff: + type: object + nullable: true + required: + - path + - checksum + properties: + path: + type: string + description: unique diff filename + example: survey.gpkg-diff-15eqn2q + checksum: + type: string + example: 45dfdfbf81a34asdf209040ffe5fasdf2a090bfa + size: + type: integer + example: 512 + history: + type: object + description: map with version names as keys and file info as values + additionalProperties: + type: object + required: + - path + - size + - checksum + - change + properties: + path: + type: string + example: media/favicon.ico + checksum: + description: sha1 hash + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + size: + type: integer + format: int64 + example: 1024 + change: + type: string + example: added + enum: + - added + - updated + - removed + expiration: + nullable: true + type: string + format: date-time + example: 2019-02-26T08:47:58.636074Z + Project: + type: object + required: + - name + properties: + id: + type: string + example: f9ef87ac-1dae-48ab-85cb-062a4784fb83 + description: Project UUID name: type: string - example: v1 - author: + example: mergin + namespace: type: string - example: john.doe + example: mergin + creator: + nullable: true + type: integer + example: 1 + description: Project creator ID created: type: string format: date-time example: 2018-11-30T08:47:58.636074Z - project_name: + updated: type: string - example: survey - namespace: + nullable: true + format: date-time + example: 2018-11-30T08:47:58.636074Z + description: Last modified + version: type: string - example: john.doe - \ No newline at end of file + nullable: true + example: v2 + description: Last project version + disk_usage: + type: integer + example: 25324373 + description: Project size in bytes + permissions: + type: object + properties: + delete: + type: boolean + example: false + update: + type: boolean + example: false + upload: + type: boolean + example: true + tags: + type: array + nullable: true + items: + $ref: "#/components/schemas/MerginTag" + uploads: + type: array + nullable: true + items: + type: string + example: 669b838e-a30b-4338-b2b6-3da144742a82 + description: UUID for ongoing upload + access: + $ref: "#/components/schemas/Access" + files: + type: array + items: + allOf: + - $ref: "#/components/schemas/FileInfo" + role: + nullable: true + type: string + enum: + - reader + - editor + - writer + - owner diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 32c05595..6717a083 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -39,7 +39,12 @@ ) from .permissions import ProjectPermissions, require_project_by_uuid from .public_api_controller import catch_sync_failure -from .schemas import ProjectMemberSchema, ProjectVersionSchema, UploadChunkSchema +from .schemas import ( + ProjectMemberSchema, + ProjectVersionSchema, + UploadChunkSchema, + ProjectSchema, +) from .storages.disk import move_to_tmp, save_to_file from .utils import get_device_id, get_ip, get_user_agent, get_chunk_location from .workspace import WorkspaceRole @@ -337,17 +342,7 @@ def create_project_version(id): finally: # remove artifacts upload.clear() - - return ( - ProjectVersionSchema( - exclude=( - "files", - "changes", - "changesets", - ) - ).dump(pv), - 201, - ) + return ProjectSchema().dump(project), 201 @auth_required diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 330f58fa..85177190 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -297,7 +297,7 @@ def test_create_version(client, data, expected, err_code): response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == expected if expected == 201: - assert response.json["name"] == "v2" + assert response.json["version"] == "v2" assert project.latest_version == 2 assert all(not os.path.exists(chunk) for chunk in chunks) else: @@ -491,7 +491,7 @@ def test_full_push(client): }, ) assert response.status_code == 201 - assert response.json["name"] == "v2" + assert response.json["version"] == "v2" assert project.latest_version == 2 assert os.path.exists( os.path.join(project.storage.project_dir, "v2", test_file["path"]) From b06423e45d5a5770116a532b6c897104a66d7afa Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Mon, 22 Sep 2025 12:13:59 +0200 Subject: [PATCH 12/13] Do not validate diff files against mime type --- server/mergin/sync/models.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 98241d35..9574a69d 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -1099,7 +1099,10 @@ def process_chunks( errors[f.path] = FileSyncErrorType.CORRUPTED.value continue - if not is_supported_type(temporary_location): + if ( + not f.change == PushChangeType.UPDATE_DIFF + and not is_supported_type(temporary_location) + ): logging.info(f"Rejecting blacklisted file: {temporary_location}") errors[f.path] = FileSyncErrorType.UNSUPPORTED.value continue From d11228a91c6a005093d6977e8932fbeb0d9cde51 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Thu, 30 Oct 2025 11:13:37 +0100 Subject: [PATCH 13/13] fix import for timezone --- server/mergin/sync/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index 2e23d375..7688a3ee 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -6,7 +6,7 @@ import shutil import os import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app