Update
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 9s

This commit is contained in:
2024-05-17 20:41:12 +03:00
parent b442a0fc38
commit 42d0a1f031
40 changed files with 1989 additions and 0 deletions

View File

@@ -3,6 +3,7 @@ __pycache__
prototypes
modules
dist
*.egg-info
data
config

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@ node_modules
modules
prototypes
dist
*.egg-info
data
config

0
docker-compose.yml Normal file
View File

195
manage.py Executable file
View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import sys
import json
import asyncio
import argparse
import traceback
from pathlib import Path
from logging.config import dictConfig
from typing import Any
def to_json(request_data: bytes) -> Any:
try:
return json.loads(request_data)
except ValueError as e:
# raise ParseError(data={'message': 'Invalid JSON: {0!r}'.format(request_data)})
raise ValueError('message')
def cli_api(args):
from mycelium.api import jsonrpc
if args.json_rpc:
json_data = to_json(args.json_rpc)
result = json.dumps(jsonrpc(json_data))
print(result)
if args.methods:
print(jsonrpc.methods.keys())
if args.example:
result = jsonrpc.example(args.example)
print(json.dumps(result))
def cli_app(args):
module_name = 'config.{}'.format(args.config)
module = __import__(module_name)
config = getattr(module, args.config)
print(config)
print(config.__dir__())
print(config.LOG_CONFIG)
# Логирование
dictConfig(config.LOG_CONFIG)
from mycelium.factory import create_app
asyncio.run(
create_app(
port=args.port
)
)
# application.run(
# debug=args.debug,
# host='0.0.0.0',
# port=args.port
# )
# application.config.from_object('config.{}'.format(args.config))
def cli_init(args):
config = Path(args.dir_config)
if config.exists() and config.is_dir():
print('Директория для конфигураций существует')
exit(1)
config.mkdir(parents=True)
filename = config / '__init__.py'
filename.touch()
import datetime
from jinja2 import Template
from mycelium.lib.passwd import pwgen
template = """# Config generated {{NOW}}
import os
import sys
DEBUG = False
SQLDEBUG = False
SESSION_COOKIE_NAME = 'mycelium'
SESSION_TYPE = 'filesystem'
TITLE = 'mycelium'
DIR_BASE = '/'.join(os.path.dirname(os.path.abspath(__file__)).split('/')[:-1])
DIR_DATA = DIR_BASE + '/data'
DIR_FILES = DIR_DATA + '/files'
DIR_SEARCH_INDEX = DIR_DATA + '/index'
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + DIR_DATA + '/db.sqlite3'
# pwgen -s 64
SECRET_KEY = '''{{SECRET_KEY}}'''
# Логирование
LOG_CONFIG = {
'version': 1,
'formatters': {
'default': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(module)s.%(funcName)s@%(lineno)d, %(threadName)s]: %(message)s',
}
},
'handlers': {
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'maxBytes': 16*1024*1024, # Размер файла лога в МБ
'backupCount': 10,
'filename': DIR_DATA + '/mycelium.log',
'formatter': 'default'
}
},
'root': {
'level': 'INFO',
'handlers': ['file']
}
}
# Количество выводимых элементов на странице
ITEMS_ON_PAGE = 50
# Право доступа по умолчанию
# Возможные значения:
# allow - разрешено всё по умолчанию
# deny - запрещено всё по умолчанию
DEFAULT_PERMISSION = 'allow'
STATIC = 'https://static.specialistoff.net'
"""
t = Template(template)
body = t.render(
NOW=datetime.datetime.now(datetime.UTC),
SECRET_KEY=pwgen(64)
)
filename = config / 'default.py'
with open(filename, 'w') as fd:
fd.write(body)
data = Path(args.dir_data)
if data.exists() and data.is_dir():
print('Директория для данных существует')
exit(1)
data.mkdir(parents=True)
def main():
parser = argparse.ArgumentParser(
description='API',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser._optionals.title = "Необязательные аргументы"
parser.add_argument("--debug", default=True, action='store_true', help="Отладочная информация")
subparsers = parser.add_subparsers(
title='subcommands',
description='valid subcommands',
help='additional help'
)
group_app = subparsers.add_parser('app')
group_app.add_argument("--port", default=5000, help="Порт")
group_app.add_argument("--config", default="default", help="Файл конфигурации")
group_app.set_defaults(func=cli_app)
group_api = subparsers.add_parser('api')
group_api.add_argument("--json-rpc")
group_api.add_argument("--methods", action='store_true')
group_api.add_argument("--example")
group_api.add_argument("--config", default="default", help="Файл конфигурации")
group_api.set_defaults(func=cli_api)
group_init = subparsers.add_parser('init')
group_init.add_argument("--dir-config", default='/app/config', help="Конфигурация")
group_init.add_argument("--dir-data", default='/app/data', help="Данные")
group_init.set_defaults(func=cli_init)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
try:
main()
except Exception as err:
traceback.print_exc(file=sys.stdout)
sys.exit(1)
sys.exit(0)

2
src/mycelium/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,6 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from .acl import ACL
__all__ = ['ACL']

View File

@@ -0,0 +1,15 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from jsonrpc import JSONRPC
jsonrpc = JSONRPC()
from . import ( # noqa F401
user,
)

161
src/mycelium/api/user.py Normal file
View File

@@ -0,0 +1,161 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import logging
from . import jsonrpc
from .. import lib, models
from ..mutations.user import user_as_dict
from config import default as config
log = logging.getLogger(__name__)
@jsonrpc.method('user')
def user_id(
id: int,
fields: list = ['id', 'name']
) -> dict:
"""Информация о пользователе
"""
user = models.db_session.query(
models.User
).filter(
models.User.id == id
).first()
if user is None:
raise ValueError
return user_as_dict(user, fields)
@jsonrpc.method('user.add')
def user_add(name: str, password: str) -> dict:
"""Новый пользователь
"""
if name is None or len(name) < 4 or len(name) > 25:
raise ValueError('Длина логина должна быть от 4 до 25 символов')
if password is None or len(password) < 4 or len(password) > 25:
raise ValueError('Длина пароля должна быть от 4 до 25 символов')
user = models.db_session.query(
models.User
).filter(
models.User.name == name
).first()
if user:
raise ValueError('Пользователь с таким логином уже существует')
newuser = models.User(
name
)
newuser.password = lib.get_hash_password(
password,
config.SECRET_KEY
)
models.db_session.add(newuser)
models.db_session.commit()
result = newuser.as_dict()
return result
@jsonrpc.method('user.enable')
def user_enable(id: int) -> dict:
"""Разблокировать пользователя
"""
userRow = models.db_session.query(
models.User
).filter(
models.User.id == id
).first()
if userRow is None:
raise ValueError
userRow.disable = False
models.db_session.commit()
return userRow.as_dict()
@jsonrpc.method('user.delete')
def user_delete(id: int) -> dict:
"""Удалить пользователя
"""
userRow = models.db_session.query(
models.User
).filter(
models.User.id == id
).first()
if userRow is None:
raise ValueError
userRow.deleted = True
models.db_session.commit()
return userRow.as_dict()
@jsonrpc.method('user.disable')
def user_disable(id: int) -> dict:
"""Заблокировать пользователя
"""
user = models.db_session.query(
models.User
).filter(
models.User.id == id
).first()
if user is None:
raise ValueError
user.disable = True
models.db_session.commit()
result = user.as_dict()
return result
@jsonrpc.method('users')
def users_list(
page: int = 1,
order_by: dict = {'field': 'name', 'order': 'asc'},
fields: list = ['id', 'name']
) -> list:
"""Список пользователей
"""
users = models.db_session.query(
models.User
)
# Сортировка
if order_by['field'] not in ['id', 'name', 'created', 'updated']:
raise ValueError
if order_by['order'] not in ['asc', 'desc']:
raise ValueError
log.info(order_by)
field = getattr(models.User, order_by['field'])
log.info(field)
order = getattr(field, order_by['order'])
log.info(order)
users = users.order_by(
order()
)
users = lib.getpage(
users,
page,
config.ITEMS_ON_PAGE
).all()
result = []
for user in users:
newRow = user_as_dict(user, fields)
result.append(newRow)
return result
@jsonrpc.method('users.count')
def users_count() -> int:
"""Количество пользователей
"""
result = models.db_session.query(
models.User
).count()
return result

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,52 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from math import ceil
def getpage(query, page=1, page_size=10):
"""Постраничный вывод
Аргументы:
query -- запрос
page -- номер страницы
page_size -- количество элементов на странице
"""
if page_size:
query = query.limit(page_size)
if page:
query = query.offset((page-1)*page_size)
return query
class Pagination(object):
def __init__(self, page, per_page, total_count):
self.page = page
self.per_page = per_page
self.total_count = total_count
@property
def pages(self):
return int(ceil(self.total_count / float(self.per_page)))
@property
def has_prev(self):
return self.page > 1
@property
def has_next(self):
return self.page < self.pages
def iter_pages(self, left_edge=2, left_current=2,
right_current=5, right_edge=2):
last = 0
for num in range(1, self.pages + 1):
if num <= left_edge or \
(num > self.page - left_current - 1 and \
num < self.page + right_current) or \
num > self.pages - right_edge:
if last + 1 != num:
yield None
yield num
last = num

View File

@@ -0,0 +1,30 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import random
import hashlib
def pwgen(length=15, hex=False):
"""
Генератор пароля
"""
keylist = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
if hex:
keylist = '0123456789ABCDEF'
password = []
while len(password) < length:
a_char = random.choice(keylist)
password.append(a_char)
return ''.join(password)
def get_hash_password(password, salt=None):
"""
Получить хеш пароля SHA-512
"""
if salt == None:
salt = uuid.uuid4().hex
text = password.encode('utf-8') + salt.encode('utf-8')
h = hashlib.sha512()
h.update(text)
return str(h.hexdigest())

View File

@@ -0,0 +1,55 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy_utils import database_exists, create_database
# from ..factory import app
from config import default as config
engine = create_engine(
config.SQLALCHEMY_DATABASE_URI,
echo=config.SQLDEBUG
)
# Если база не создана, то она будет создана
if not database_exists(engine.url):
create_database(engine.url)
db_session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
)
Base = declarative_base()
Base.query = db_session.query_property()
# Пользователи
from .user import User
# ACL
from .acl.resource import ACLResource
from .acl.role import ACLRole
from .acl.common import (
ACLRoleRule,
ACLUserRule,
ACLUserRole,
ACLIPRule
)
from .acl.extension import FileExtension
# Теги
from .tag.common import Tag
from .tag.trash import TrashTag
# Ссылки
from .ip import IP
Base.metadata.create_all(engine)
__all__ = []

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,166 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import datetime
from sqlalchemy import (
Table,
Column,
Boolean,
Integer,
ForeignKey,
String,
DateTime,
Enum
)
from sqlalchemy.orm import relationship
from config import default as config
from .. import Base
class ACLRoleRule(Base): # Переименовать в ACLRoleRule
"""Набор прав доступа для роли
"""
__tablename__ = "acl_role_rule"
id = Column(Integer, primary_key=True, index=True)
role_id = Column(Integer, ForeignKey('acl_role.id'), index=True)
resource_id = Column(Integer, ForeignKey('acl_resource.id'), index=True)
permission = Column(String, default=config.DEFAULT_PERMISSION) # Разрешение
# Связи
role = relationship(
"ACLRole",
primaryjoin="ACLRoleRule.role_id == ACLRole.id",
uselist=False
)
resource = relationship(
"ACLResource",
primaryjoin="ACLRoleRule.resource_id == ACLResource.id",
uselist=False
)
def __init__(self, role, resource, permission):
assert type(role).__name__ == 'ACLRole', 'Не передан объект ACLRole'
assert type(resource).__name__ == 'ACLResource', \
'Не передан объект ACLResource'
assert permission in ['allow', 'deny'], \
'Некорректный параметр permission'
self.role_id = role.id
self.resource_id = resource.id
self.permission = permission
def __repr__(self):
return "<ACLResource({}, {}, {}, '{}')>".format(
self.id,
self.role,
self.resource,
self.permission
)
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
class ACLUserRule(Base):
"""Права доступа пользователя
"""
__tablename__ = "acl_user_rule"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey('user.id'), index=True)
resource_id = Column(Integer, ForeignKey('acl_resource.id'), index=True)
permission = Column(String, default=config.DEFAULT_PERMISSION) # Разрешение
# Связи
user = relationship(
"User",
primaryjoin="ACLUserRule.user_id == User.id",
uselist=False
)
resource = relationship(
"ACLResource",
primaryjoin="ACLUserRule.resource_id == ACLResource.id",
uselist=False
)
def __init__(self, resource, user, permission):
assert type(resource).__name__ == 'ACLResource', \
'Не передан объект ACLResource'
assert type(user).__name__ == 'User', 'Не передан объект User'
assert permission in ['allow', 'deny'], \
'Некорректный параметр permission'
self.resource_id = resource.id
self.user_id = user.id
self.permission = permission
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
class ACLUserRole(Base):
"""Роль пользователя
"""
__tablename__ = "acl_user_role"
id = Column(Integer, primary_key=True, index=True)
role_id = Column(Integer, ForeignKey('acl_role.id'), index=True)
user_id = Column(Integer, ForeignKey('user.id'), index=True)
# Связи
user = relationship(
"User",
primaryjoin="ACLUserRole.user_id == User.id",
uselist=False
)
role = relationship(
"ACLRole",
primaryjoin="ACLUserRole.role_id == ACLRole.id",
uselist=False
)
def __init__(self, user, role):
assert type(role).__name__ == 'ACLRole', 'Не передан объект ACLRole'
assert type(user).__name__ == 'User', 'Не передан объект User'
self.role_id = role.id
self.user_id = user.id
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
class ACLIPRule(Base):
"""Права доступа для IP
"""
__tablename__ = "acl_ip_rule"
id = Column(Integer, primary_key=True, index=True)
resource_id = Column(Integer, ForeignKey('acl_resource.id'), index=True)
ip_id = Column(Integer, ForeignKey('ip.id'), index=True)
permission = Column(String, default=config.DEFAULT_PERMISSION) # Разрешение
# Связи
ip = relationship(
"IP",
primaryjoin="ACLIPRule.ip_id == IP.id",
uselist=False
)
resource = relationship(
"ACLResource",
primaryjoin="ACLIPRule.resource_id == ACLResource.id",
uselist=False
)
def __init__(self, resource, ip, permission):
assert type(resource).__name__ == 'ACLResource', \
'Не передан объект ACLResource'
assert type(ip).__name__ == 'IP', 'Не передан объект IP'
assert permission in ['allow', 'deny'], \
'Некорректный параметр permission'
self.resource_id = resource.id
self.ip_id = ip.id
self.permission = permission
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

View File

@@ -0,0 +1,36 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import datetime
from sqlalchemy import Column, Integer, ForeignKey, String, DateTime, Boolean
from sqlalchemy.orm import relationship
from config import default as config
from .. import Base
from ... import app
class FileExtension(Base):
"""Расширения файлов
"""
__tablename__ = "file_extension"
value = Column(String, primary_key=True, index=True)
mime = Column(String, index=True, default='application/octet-stream')
permission = Column(String, index=True, default=config.DEFAULT_PERMISSION)
# Связи
# tags = relationship("FileTag", primaryjoin="File.id == FileTag.filehash")
# user = relationship("User", primaryjoin="File.user_id == User.id")
def __init__(self, value: str, mime: str = 'application/octet-stream'):
assert len(value)>0, 'Значение не может быть пустым или нулевым'
self.value = value
self.mime = mime
def __repr__(self):
return "<FileExtension('{}')>".format(self.value)
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

View File

@@ -0,0 +1,57 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import datetime
from sqlalchemy import (
Table,
Column,
Boolean,
Integer,
ForeignKey,
String,
DateTime,
Enum
)
from sqlalchemy.orm import relationship
from config import default as config
from .. import Base
class ACLResource(Base):
"""Объекты доступа: процедура и содержащий процедуру модуль
"""
__tablename__ = "acl_resource"
id = Column(Integer, primary_key=True, index=True)
funcname = Column(String)
modulename = Column(String)
# Связи
ip_rules = relationship(
"ACLIPRule",
primaryjoin="ACLResource.id == ACLIPRule.resource_id"
)
role_rules = relationship(
"ACLRoleRule",
primaryjoin="ACLResource.id == ACLRoleRule.resource_id"
)
user_rules = relationship(
"ACLUserRule",
primaryjoin="ACLResource.id == ACLUserRule.resource_id"
)
def __init__(self, modulename, funcname):
self.funcname = funcname
self.modulename = modulename
def __repr__(self):
return "<ACLResource({}, '{}': '{}')>".format(
self.id,
self.modulename,
self.funcname
)
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

View File

@@ -0,0 +1,48 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import datetime
from sqlalchemy import (
Table,
Column,
Boolean,
Integer,
ForeignKey,
String,
DateTime,
Enum
)
from sqlalchemy.orm import relationship
from config import default as config
from .. import Base
class ACLRole(Base): # Переименовать в ACLRole
"""Роли доступа
"""
__tablename__ = "acl_role"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(String, default='')
# Связи
rules = relationship(
"ACLRoleRule",
primaryjoin="ACLRole.id == ACLRoleRule.role_id"
)
users = relationship(
"ACLUserRole",
primaryjoin="ACLRole.id == ACLUserRole.role_id"
)
def __init__(self, name):
self.name = name
def __repr__(self):
return "<ACLRole({}, '{}')>".format(self.id, self.name)
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

31
src/mycelium/models/ip.py Normal file
View File

@@ -0,0 +1,31 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import datetime
from sqlalchemy import Table, Column, Boolean, Integer, ForeignKey, String, DateTime
from sqlalchemy.orm import relationship
from . import Base
class IP(Base):
"""Таблица IP"""
__tablename__ = "ip"
id = Column(Integer, primary_key=True, index=True)
ip = Column(String, nullable=False, unique=True, index=True)
description = Column(String)
# Связи
# tagquestion = relationship("TagQuestion", primaryjoin="TagQuestion.tag_id == Tag.id")
def __init__(self, ip):
self.ip = ip
def __repr__(self):
return "<IP('%s')>" % (self.ip)
def as_dict(self):
return {
c.name: getattr(self, c.name)
for c in self.__table__.columns
}

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,67 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
"""
Таблицы:
Tag
"""
import datetime
from sqlalchemy import Column, Integer, ForeignKey, String, DateTime
from sqlalchemy.orm import relationship
from .. import Base
class Tag(Base):
"""Теги, метки
"""
__tablename__ = "tag"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey('user.id'), index=True)
name = Column(String, nullable=False, unique=True)
description = Column(String, default='')
# Дата создания
created = Column(DateTime)
# Дата обновления
updated = Column(DateTime)
# Связи
# tagquestion = relationship(
# "TagQuestion",
# primaryjoin="TagQuestion.tag_id == Tag.id"
# )
# orders = relationship(
# "TagOrder",
# primaryjoin="Tag.id == TagOrder.tag_id"
# )
trash = relationship(
"TrashTag",
primaryjoin="Tag.id == TrashTag.tag_id",
uselist=False
)
# pages = relationship(
# "TagPage",
# primaryjoin="Tag.id == TagPage.tag_id"
# )
# user = relationship(
# "User",
# primaryjoin="Tag.user_id == User.id",
# uselist=False
# )
def __init__(self, user, name):
assert type(user).__name__ == 'User', 'Не передан объект User'
self.user_id = user.id
self.name = name
self.created = datetime.datetime.now()
self.updated = datetime.datetime.now()
def __repr__(self):
return "<Tag({}, '{}')>".format(self.id, self.name)
def as_dict(self):
"""Возвращает словарь
"""
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

View File

@@ -0,0 +1,42 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import datetime
from sqlalchemy import Column, Integer, ForeignKey, DateTime, String
from sqlalchemy.orm import relationship
from .. import Base
class TrashTag(Base):
"""Корзина для тегов
"""
__tablename__ = "trashtag"
id = Column(Integer, primary_key=True, index=True)
# ID пользователя
user_id = Column(Integer, ForeignKey('user.id'), index=True)
# ID тега
tag_id = Column(Integer, ForeignKey('tag.id'), index=True)
# Дата удаления
created = Column(DateTime)
# Связи
user = relationship(
"User",
primaryjoin="TrashTag.user_id == User.id",
uselist=False
)
tag = relationship(
"Tag",
primaryjoin="TrashTag.tag_id == Tag.id",
back_populates="trash",
uselist=False
)
def __init__(self, user, tag):
assert type(user).__name__ == 'User', 'Не передан объект User'
assert type(tag).__name__ == 'Tag', 'Не передан объект Tag'
self.user_id = user.id
self.tag_id = tag.id
self.created = datetime.datetime.now()

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,2 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'

View File

@@ -0,0 +1,10 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from . import ( # noqa F401
common,
extension,
resource,
role,
user
)

View File

@@ -0,0 +1,298 @@
"""
API ACL
"""
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from .. import jsonrpc
from ... import app, lib, models
@jsonrpc.method('acl.ip.add')
def acl_ip_add(address: str, description: str = '') -> dict:
"""Добавление IP адреса
"""
exist = models.db_session.query(
models.IP
).filter(
models.IP.ip == address
).first()
if exist:
raise ValueError('Duplicate')
newip = models.IP(address)
newip.description = description
models.db_session.add(newip)
models.db_session.commit()
return newip.as_dict()
@jsonrpc.method('acl.ips')
def acl_ips(page: int) -> list:
"""Список IP адресов
"""
addresses = models.db_session.query(
models.IP
).order_by(
models.IP.ip.asc()
)
addresses = lib.getpage(
addresses,
page,
app.config['ITEMS_ON_PAGE']
).all()
result = []
for ip in addresses:
ipRow = ip.as_dict()
result.append(ipRow)
return result
@jsonrpc.method('acl.ip')
def acl_ip(id: int) -> dict:
"""IP адрес
"""
ip = models.db_session.query(
models.IP
).filter(
models.IP.id == id
).first()
if ip is None:
raise ValueError("404 Not found")
return ip.as_dict()
@jsonrpc.method('acl.ip.delete')
def acl_ip_delete(address: str) -> bool:
"""Удаление IP адреса
"""
exist = models.db_session.query(
models.IP
).filter(
models.IP.ip == address
).first()
if exist is None:
raise ValueError
models.db_session.delete(exist)
models.db_session.commit()
return True
@jsonrpc.method('acl.ip.update')
def acl_ip_update(id: int, address: str, description: str = '') -> dict:
"""Обновление IP адреса
"""
ip = models.db_session.query(
models.IP
).filter(
models.IP.id == id
).first()
if ip is None:
raise ValueError('404 Not found')
ip.ip = address
ip.description = description
models.db_session.commit()
return ip.as_dict()
@jsonrpc.method('acl.rule.update')
def acl_rule_update(resourceid: int, user: int, permission: str) -> dict:
"""Установка прав пользователя
"""
resource = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.id == resourceid
).first()
if resource is None:
raise ValueError
userRow = models.db_session.query(
models.User
).filter(
models.User.id == user
).first()
if userRow is None:
raise ValueError
if permission not in ['allow', 'deny']:
raise ValueError
rule = models.db_session.query(
models.ACLUserRule
).filter(
models.ACLUserRule.resource_id == resourceid,
models.ACLUserRule.user_id == user
).first()
if rule is None:
rule = models.ACLUserRule(
resource,
userRow,
app.config['DEFAULT_PERMISSION']
)
models.db_session.add(rule)
models.db_session.commit()
rule.permission = permission
models.db_session.commit()
return rule.as_dict()
@jsonrpc.method('acl.deleteACLUserRule')
def acl_deleteACLUserRule(resourceid: int, user: int) -> bool:
"""Удалить связь пользователя и прав на объект
"""
resource = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.id == resourceid
).first()
if resource is None:
raise ValueError
userRow = models.db_session.query(
models.User
).filter(
models.User.id == user
).first()
if userRow is None:
raise ValueError
exist = models.db_session.query(
models.ACLUserRule
).filter(
models.ACLUserRule.resource_id == resource.id,
models.ACLUserRule.user_id == user
).first()
if exist is None:
raise ValueError
models.db_session.delete(exist)
models.db_session.commit()
return True
@jsonrpc.method('acl.permission.ip.add')
def acl_permission_ip_add(
modulename: str,
funcname: str,
ip: str,
permission: str
) -> dict:
resource = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.funcname == funcname,
models.ACLResource.modulename == modulename
).first()
if resource is None:
raise ValueError
ipRow = models.db_session.query(
models.IP
).filter(
models.IP.ip == ip
).first()
if ipRow is None:
raise ValueError
exist = models.db_session.query(
models.ACLIPRule
).filter(
models.ACLIPRule.resource_id == resource.id,
models.ACLIPRule.ip_id == ipRow.id
).first()
if exist:
raise ValueError
new_rule = models.ACLIPRule(
resource,
ipRow,
permission
)
models.db_session.add(new_rule)
models.db_session.commit()
result = new_rule.as_dict()
return result
@jsonrpc.method('acl.permission.ip.update')
def acl_permission_ip_update(id: int, permission: str) -> dict:
rule = models.db_session.query(
models.ACLIPRule
).filter(
models.ACLIPRule.id == id
).first()
if rule is None:
raise ValueError
rule.permission = permission
models.db_session.commit()
result = rule.as_dict()
result["aclresource"] = rule.aclresource.as_dict()
result["ip"] = rule.ip.as_dict()
return result
@jsonrpc.method('acl.permission.ip.destroy')
def acl_permission_ip_destroy(
modulename: str,
funcname: str,
ip: str,
permission: str
) -> bool:
"""Удалить правило для IP
"""
resource = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.funcname == funcname,
models.ACLResource.modulename == modulename
).first()
if resource is None:
raise ValueError
ipRow = models.db_session.query(
models.IP
).filter(
models.IP.ip == ip
).first()
if ipRow is None:
raise ValueError
exist = models.db_session.query(
models.ACLIPRule
).filter(
models.ACLIPRule.resource_id == resource.id,
models.ACLIPRule.ip_id == ipRow.id
).first()
if exist is None:
raise ValueError('Правило не найдено')
models.db_session.delete(exist)
models.db_session.commit()
return True
@jsonrpc.method('acl.rules')
def acl_rules(page: int = 1) -> list:
"""Список правил для всех пользователей
"""
rules = models.db_session.query(
models.ACLUserRule
)
rules = lib.getpage(
rules,
page,
app.config['ITEMS_ON_PAGE']
).all()
result = []
for rule in rules:
newRow = rule.as_dict()
newRow['user'] = rule.user.as_dict()
newRow['resource'] = rule.resource.as_dict()
result.append(newRow)
return result
@jsonrpc.method('acl.rules.count')
def acl_rules_count() -> int:
"""Количество всех правил для всех пользователей
"""
result = models.db_session.query(
models.ACLUserRule
).count()
return result

View File

@@ -0,0 +1,94 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from .. import jsonrpc
from ... import app, lib, models
@jsonrpc.method('acl.extension.add')
def acl_extension_add(value: str, mime: str) -> dict:
"""Добавление расширения
"""
exist = models.db_session.query(
models.FileExtension
).filter(
models.FileExtension.value == value
).first()
if exist:
raise ValueError('Duplicate')
newExtention = models.FileExtension(value=value, mime=mime)
newExtention.permission = 'allow'
models.db_session.add(newExtention)
models.db_session.commit()
return newExtention.as_dict()
@jsonrpc.method('acl.extension.delete')
def acl_extension_delete(value: str) -> bool:
"""Удаление расширения
"""
exist = models.db_session.query(
models.FileExtension
).filter(
models.FileExtension.value == value
).first()
if exist is None:
raise ValueError('Not found')
models.db_session.delete(exist)
models.db_session.commit()
return True
@jsonrpc.method('acl.extensions')
def acl_extensions(
page: int = 1,
order_by: dict = {'field': 'value', 'order': 'asc'}
) -> list:
"""Добавление расширения
"""
extensions = models.db_session.query(
models.FileExtension
)
# Сортировка
if order_by['field'] not in ['value', 'mime']:
raise ValueError
if order_by['order'] not in ['asc', 'desc']:
raise ValueError
app.logger.info(order_by)
field = getattr(models.FileExtension, order_by['field'])
app.logger.info(field)
order = getattr(field, order_by['order'])
app.logger.info(order)
extensions = extensions.order_by(
order()
)
extensions = lib.getpage(
extensions,
page,
app.config['ITEMS_ON_PAGE']
).all()
result = []
for extension in extensions:
newRow = extension.as_dict()
result.append(newRow)
return result
@jsonrpc.method('acl.extensions.count')
def acl_extensions_count() -> int:
"""Количество расширений
"""
result = models.db_session.query(
models.FileExtension
).count()
return result

View File

@@ -0,0 +1,71 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from .. import jsonrpc
from ... import app, lib, models
@jsonrpc.method('acl.resource.delete')
def acl_resource_delete(id: int) -> bool:
"""Удаление объекта доступа из базы и связанных прав
"""
resource = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.id == id
).first()
if resource is None:
raise ValueError
# Удаляем связи
for ip_rule in resource.ip_rules:
models.db_session.delete(ip_rule)
models.db_session.commit()
for role_rule in resource.role_rules:
models.db_session.delete(role_rule)
models.db_session.commit()
for user_rule in resource.user_rules:
models.db_session.delete(user_rule)
models.db_session.commit()
# Удаляем ресурс
models.db_session.delete(resource)
models.db_session.commit()
return True
@jsonrpc.method('acl.resources')
def acl_resources(page: int) -> list:
"""Список ресурсов
"""
resources = models.db_session.query(
models.ACLResource
).order_by(
models.ACLResource.modulename.asc(),
models.ACLResource.funcname.asc()
)
resources = lib.getpage(
resources,
page,
app.config['ITEMS_ON_PAGE']
).all()
result = []
for resource in resources:
newRow = resource.as_dict()
newRow["ips"] = []
newRow["roles"] = []
newRow["users"] = []
result.append(newRow)
return result
@jsonrpc.method('acl.resources.count')
def acl_resources_count() -> int:
"""Количество ресурсов
"""
result = models.db_session.query(
models.ACLResource
).count()
return result

View File

@@ -0,0 +1,345 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
import logging
from .. import jsonrpc
from ... import app, lib, models
from ...mutations.acl.role import role_as_dict
log = logging.getLogger(__name__)
@jsonrpc.method('acl.role')
def acl_role(
id: int,
fields: list = ['id', 'name']
) -> dict:
"""Роль доступа
"""
role = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == id
).first()
if role is None:
raise ValueError('Роль не найдена')
result = role_as_dict(role, fields)
return result
@jsonrpc.method('acl.role.add')
def acl_role_add(name: str, description: str) -> dict:
"""Добавление роли доступа
"""
newrole = models.ACLRole(lib.Escape(name).tostring())
newrole.description = lib.Escape(description).tostring()
models.db_session.add(newrole)
models.db_session.commit()
result = newrole.as_dict()
return result
@jsonrpc.method('acl.role.delete')
def acl_role_delete(id: int) -> bool:
"""Удаление роли и связанных прав
"""
role = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == id
).first()
if role is None:
raise ValueError
for user in role.users:
models.db_session.delete(user)
models.db_session.commit()
models.db_session.delete(role)
models.db_session.commit()
return True
@jsonrpc.method('acl.role.rule.update')
def acl_role_rule_update(role: int, resource: int, permission: str) -> dict:
"""Установка прав роли
"""
resourceRow = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.id == resource
).first()
if resourceRow is None:
raise ValueError
roleRow = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == role
).first()
if roleRow is None:
raise ValueError
if permission not in ['allow', 'deny']:
raise ValueError
rule = models.db_session.query(
models.ACLRoleRule
).filter(
models.ACLRoleRule.resource_id == resource,
models.ACLRoleRule.role_id == role
).first()
if rule is None:
rule = models.ACLRoleRule(
role=roleRow,
resource=resourceRow,
permission=app.config['DEFAULT_PERMISSION']
)
models.db_session.add(rule)
models.db_session.commit()
rule.permission = permission
models.db_session.commit()
result = rule.as_dict()
return result
@jsonrpc.method('acl.role.rules.update')
def acl_role_rules_update(role: int, rules: list) -> list:
"""Установка прав роли
"""
roleRow = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == role
).first()
if roleRow is None:
raise ValueError
result = []
for rule in rules:
app.logger.info(rule)
resourceRow = models.db_session.query(
models.ACLResource
).filter(
models.ACLResource.id == rule["resource"]
).first()
if resourceRow is None:
raise ValueError
if rule["permission"] not in ['allow', 'deny']:
raise ValueError
ruleRow = models.db_session.query(
models.ACLRoleRule
).filter(
models.ACLRoleRule.resource_id == rule["resource"],
models.ACLRoleRule.role_id == role
).first()
if ruleRow is None:
ruleRow = models.ACLRoleRule(
role=roleRow,
resource=resourceRow,
permission=app.config['DEFAULT_PERMISSION']
)
models.db_session.add(ruleRow)
models.db_session.commit()
ruleRow.permission = rule["permission"]
models.db_session.commit()
result.append(ruleRow.as_dict())
return result
@jsonrpc.method('acl.role.user.add')
def acl_role_user_add(role: int, user: int) -> bool:
"""Добавление участника в роль
"""
roleRow = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == role
).first()
if role is None:
raise ValueError('Role {} not found'.format(role))
userRow = models.db_session.query(
models.User
).filter(
models.User.id == user
).first()
if user is None:
raise ValueError('User {} not found'.format(user))
exist = models.db_session.query(
models.ACLUserRole
).filter(
models.ACLUserRole.user_id == user,
models.ACLUserRole.role_id == role
).first()
if exist:
raise ValueError('Role {} contains User {}'.format(role, user))
newuserrole = models.ACLUserRole(userRow, roleRow)
models.db_session.add(newuserrole)
models.db_session.commit()
return True
@jsonrpc.method('acl.role.rule.delete')
def acl_role_rule_delete(id: int) -> bool:
"""Удалить правило из роли
"""
rule = models.db_session.query(
models.ACLRoleRule
).filter(
models.ACLRoleRule.id == id
).first()
if rule is None:
raise ValueError
models.db_session.delete(rule)
models.db_session.commit()
return True
@jsonrpc.method('acl.role.rules')
def acl_role_rules(
id: int,
resources: list = None
) -> list:
"""Список правил роли
"""
role = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == id
).first()
if role is None:
raise ValueError('Роль не найдена')
permissions = models.db_session.query(
models.ACLRoleRule
).filter(
models.ACLRoleRule.role_id == id
)
if isinstance(resources, list):
permissions = permissions.filter(
models.ACLRoleRule.resource_id.in_(resources)
)
permissions = permissions.all()
result = []
for item in permissions:
newRow = item.as_dict()
result.append(newRow)
return result
@jsonrpc.method('acl.role.update')
def acl_role_update(id: int, name: str, description: str) -> dict:
"""Обновление роли доступа
"""
role = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == id
).first()
if role is None:
raise ValueError('Роль не найдена')
role.name = lib.Escape(name).tostring()
role.description = lib.Escape(description).tostring()
models.db_session.commit()
result = role.as_dict()
return result
@jsonrpc.method('acl.role.user.delete')
def acl_role_user_delete(role: int, user: int) -> bool:
"""Удаление участника из роли
"""
exist = models.db_session.query(
models.ACLUserRole
).filter(
models.ACLUserRole.role_id == role,
models.ACLUserRole.user_id == user
).first()
if exist is None:
raise ValueError
models.db_session.delete(exist)
models.db_session.commit()
return True
@jsonrpc.method('acl.role.users')
def acl_role_users(id: int) -> list:
"""Список пользователей роли
"""
role = models.db_session.query(
models.ACLRole
).filter(
models.ACLRole.id == id
).first()
if role is None:
raise ValueError('Роль не найдена')
user_role_list = models.db_session.query(
models.ACLUserRole
).filter(
models.ACLUserRole.role_id == id
).all()
result = []
for item in user_role_list:
newRow = item.as_dict()
result.append(newRow)
return result
@jsonrpc.method('acl.roles')
def acl_roles(
page: int = 1,
order_by: dict = {'field': 'name', 'order': 'asc'},
fields: list = ['id', 'name']
) -> list:
"""Список ролей доступа
"""
roles = models.db_session.query(
models.ACLRole
)
# Сортировка
if order_by['field'] not in ['created', 'id', 'name', 'updated']:
raise ValueError
if order_by['order'] not in ['asc', 'desc']:
raise ValueError
log.info(order_by)
field = getattr(models.ACLRole, order_by['field'])
log.info(field)
order = getattr(field, order_by['order'])
log.info(order)
roles = roles.order_by(
order()
)
roles = lib.getpage(
roles,
page,
app.config['ITEMS_ON_PAGE']
).all()
result = []
for role in roles:
newRow = role.as_dict()
newRow['users'] = []
result.append(newRow)
return result
@jsonrpc.method('acl.roles.count')
def acl_roles_count() -> int:
"""Количество ролей доступа
"""
result = models.db_session.query(
models.ACLRole
).count()
return result

View File

@@ -0,0 +1,40 @@
from .. import jsonrpc
from ... import models
@jsonrpc.method('acl.user.rule.delete')
def acl_user_rule_delete(id: int) -> bool:
"""Удалить правило
"""
exist = models.db_session.query(
models.ACLUserRule
).filter(
models.ACLUserRule.id == id
).first()
if exist is None:
raise ValueError
models.db_session.delete(exist)
models.db_session.commit()
return True
@jsonrpc.method('acl.user.rule.update')
def acl_user_rule_update(id: int, permission: str) -> dict:
"""Изменить правило
"""
if permission not in ['allow', 'deny']:
raise ValueError
rule = models.db_session.query(
models.ACLUserRule
).filter(
models.ACLUserRule.id == id
).first()
if rule is None:
raise ValueError
rule.permission = permission
models.db_session.commit()
result = rule.as_dict()
result['user'] = rule.user.as_dict()
result['resource'] = rule.resource.as_dict()
return result

View File

@@ -0,0 +1,6 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from . import (
common,
)

View File

@@ -0,0 +1,44 @@
function Backtotop() {
let data = {
get visible() {
return data.raw_visible;
},
set visible(value) {
if (data.raw_visible!=value) {
m.redraw();
}
data.raw_visible = value;
},
raw_visible: false,
}
function backToTop() {
let currentScroll = document.documentElement.scrollTop || document.body.scrollTop
if (currentScroll > 0) {
window.scrollTo(0, 0)
}
};
function catchScroll() {
data.visible = (window.pageYOffset > 100);
};
return {
oninit: function(vnode) {
window.addEventListener('scroll', catchScroll);
let currentScroll = document.documentElement.scrollTop || document.body.scrollTop
data.visible = (currentScroll > 100);
},
onremove: function(vnode) {
window.removeEventListener('scroll', catchScroll)
},
view: function(vnode) {
if (data.visible) {
return m('div', {class: 'scrollToTop', style: 'z-index: 10;', onclick: backToTop},
m('div', {class: 'card'},
m('div', {class: 'card-body py-2 px-2'},
m('i', {class: 'fa fa-chevron-up'})
)
)
);
}
}
};
};

View File

@@ -0,0 +1,3 @@
{% include '/components/backtotop.js' %}
{% include '/components/filter.js' %}
{% include '/components/notifications.js' %}

View File

@@ -0,0 +1,51 @@
function Notifications() {
let data = {
notifications: [
// {
// title: 'Danger!',
// typeOf: 'danger',
// body: 'body'
// }
]
};
function notification_render(notification, notificationIdx) {
let border = '';
if (notification.typeOf=='danger') {
border = 'border-danger';
}
return m('div', {class: `toast show ${border}`},
m('div', {class: 'toast-header'},
m('strong', {class: 'me-auto'}, notification.title),
m('button', {type: 'button', class: 'btn-close', onclick: function() { notification_delete(notification) }})
),
m('div', {class: 'toast-body'}, notification.body)
);
};
function notifications_render() {
return notifications.map(notification_render);
};
return {
oncreate: function(vnode) {
console.log('Notifications.oncreate');
for (let key in vnode.attrs){
data[key] = vnode.attrs[key];
};
},
oninit: function(vnode) {
console.log('Notifications.oninit');
for (let key in vnode.attrs){
data[key] = vnode.attrs[key];
};
},
onbeforeremove: function(vnode) {
console.log('Notifications.onbeforeremove');
for (let key in vnode.attrs){
data[key] = vnode.attrs[key];
};
},
view: function() {
console.log('Notifications.view');
return m('div', {class: 'toast-container position-fixed top-0 end-0 p-3'}, [notifications_render()]);
}
};
};

View File

@@ -0,0 +1,4 @@
{% include '/lib/common.js' %}
{% include '/lib/errors.js' %}
{% include '/lib/event-bus.js' %}
{% include '/lib/tinymce.js' %}

View File

@@ -0,0 +1,30 @@
{% include '/private/settings.js' %}
//let vr = document.body;
let vroot = document.getElementById("app");
let routes = {};
let notifications = [];
{% include '/lib/inc.j2' %}
const bus = EventBus();
bus.on('notify');
function notification_add(notification) {
console.log(notification)
notifications.push(notification);
};
function notification_delete(notification) {
notifications = arrayRemove(notifications, notification);
};
bus.on('notify', notification_add)
{% include '/private/layout.js' %}
{% include '/components/inc.j2' %}
{% include '/private/components/inc.j2' %}
{% include '/private/domains/inc.j2' %}
{% include '/private/routes.js' %}
// m.render(vr, Body);

View File

@@ -0,0 +1,9 @@
let Footer = {
view: function() {
result = {tag: '<', children: `<div class="row bg-light py-3">
<div class="col-md-1"></div>
<div class="col-md-11">&copy; RemiZOffAlex</div>
</div>`};
return result;
}
};

View File

@@ -0,0 +1,2 @@
{% include '/private/components/footer.js' %}
{% include '/private/components/navbar.js' %}

View File

@@ -0,0 +1,3 @@
{% include '/private/domains/tag/inc.j2' %}
{% include '/private/domains/home.js' %}