跳轉到

View

view.budget

Budget

batch

batch(pid: str) -> str | ResponseBase

batch upload

Source code in view/budget.py
@VIEW_BUDGET.route('/batch/<pid>', methods=('GET', 'POST'))
def batch(pid: str) -> str | ResponseBase:  # pylint: disable=too-many-branches
    ''' batch upload '''
    project = Project.get(pid)

    if not project:
        return redirect('/')

    is_admin = Budget.is_admin(pid=pid, uid=g.user['account']['_id'])
    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        return render_template('./budget_batch.html',
                               project=project.dict(by_alias=True), is_admin=is_admin)

    if request.method == 'POST':
        if request.is_json:
            data = request.get_json()

            if data and data.get('casename') == 'get':
                teams = [
                    {'name': team.name, 'tid': team.id}
                    for team in Team.list_by_pid(pid=project.id)
                ]

                return jsonify({'teams': teams})

        if request.files and 'file' in request.files:
            csv_file = list(csv.DictReader(io.StringIO('\n'.join(
                request.files['file'].read().decode('utf8').split('\n')[1:]))))

            result, error_result = Budget.verify_batch_items(items=csv_file)

            dedup_result = []
            dup_bids = []

            # Pylint unnecessary-lambda-assignment / C3001
            # Lambda expression assigned to a variable.
            # Define a function using the "def" keyword instead.
            def has_bid_in_budget(bid: str) -> dict[str, Any] | None:
                return Budget.get_by_bid(pid=pid, bid=bid)

            def has_added(item: dict[str, Any]) -> bool:
                return bool(item['action'] == 'add' and has_bid_in_budget(item['bid']))

            def did_update_nonexisted_entry(item: dict[str, Any]) -> bool:
                return \
                    item['action'] == 'update' and not has_bid_in_budget(
                        item['bid'])

            for item in result:
                if has_added(item) or did_update_nonexisted_entry(item):
                    dup_bids.append(item['bid'])
                else:
                    dedup_result.append(item)

            if request.form['casename'] == 'verify':
                return jsonify({
                    'file': csv_file,
                    'confirmed': dedup_result,
                    'error_items': error_result,
                    'dup_bids': dup_bids,
                })

            if request.form['casename'] == 'save':
                for item in dedup_result:
                    if item['action'] == 'add':
                        Budget.add(pid=pid, tid=item['tid'], data=item)
                    elif item['action'] == 'update':
                        budget_id = Budget.get_by_bid(pid=pid, bid=item['bid'])
                        if budget_id:
                            item['_id'] = budget_id
                            Budget.edit(pid=pid, data=item)

    return jsonify({})

by_project_index

by_project_index(pid: str) -> str | ResponseBase

index

Source code in view/budget.py
@VIEW_BUDGET.route('/<pid>', methods=('GET', 'POST'))
def by_project_index(pid: str) -> str | ResponseBase:
    ''' index '''
    # pylint: disable=too-many-return-statements,too-many-branches
    project = Project.get(pid)

    if not project:
        return redirect('/')

    is_admin = Budget.is_admin(pid=pid, uid=g.user['account']['_id'])
    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        return render_template('./budget.html',
                               project=project.dict(by_alias=True), is_admin=is_admin)

    if request.method == 'POST':
        data = request.get_json()

        if data and data['casename'] == 'get':
            teams = []
            for team in Team.list_by_pid(pid=project.id):
                teams.append({'name': team.name, 'tid': team.id})

            default_budget = {
                'bid': '',
                'uid': '',
                'name': '',
                'total': 0,
                'paydate': '',
                'desc': '',
                'estimate': '',
                'tid': '',
                'currency': 'TWD',
            }

            items = []
            for item in Budget.get_by_pid(pid=pid):
                if item['enabled']:
                    item['enabled'] = 'true'
                else:
                    item['enabled'] = 'false'

                items.append(item)

            return jsonify({'teams': teams, 'default_budget': default_budget, 'items': items})

        if data and data['casename'] == 'check_bid':
            return jsonify({'existed': bool(Budget.get_by_bid(pid=pid, bid=data['bid']))})

        if data and data['casename'] == 'add':
            item = Budget.add(
                pid=pid, tid=data['data']['tid'], data=data['data'])
            return jsonify({'data': item})

        if data and data['casename'] == 'edit':
            if data['data']['enabled'] == 'true':
                data['data']['enabled'] = True
            else:
                data['data']['enabled'] = False

            item = Budget.edit(pid=pid, data=data['data'])

            if item['enabled']:
                item['enabled'] = 'true'
            else:
                item['enabled'] = 'false'

            return jsonify({'data': item})

    return jsonify({})

view.dev

Dev

index

index() -> str | ResponseBase

Index page

Source code in view/dev.py
@VIEW_DEV.route('/', methods=('GET', 'POST'))
def index() -> str | ResponseBase:
    ''' Index page '''
    if request.method == 'GET':
        if 'tc' not in session:
            session['tc'] = 0

        session['tc'] += 1

        return render_template('./dev_index.html', count=session['tc'])

    if request.method == 'POST':
        data = request.get_json()

        if data:
            if 'casename' in data and data['casename'] == 'get':
                accounts = []
                for user in UsersDB().find():
                    accounts.append(user)

                sessions = []
                for usession in USessionDB().find():
                    sessions.append(usession)

                return jsonify({
                    'sid': session['sid'],
                    'accounts': accounts,
                    'sessions': sessions,
                    'projects': [item.dict(by_alias=True) for item in Project.all()],
                })

            if 'casename' in data and data['casename'] == 'create_project':
                for usession in USessionDB().find({'_id': session['sid']}):
                    project_data = data['project']
                    Project.create(pid=project_data['pid'],
                                   name=project_data['name'],
                                   owners=[usession['uid'], ],
                                   action_date=project_data['action_date'],
                                   )

            if 'casename' in data and data['casename'] == 'make_suspend':
                user_obj = User(uid=data['uid'])
                user_obj.property_suspend(value=not user_obj.has_suspended())

    return jsonify({})
set_cookie() -> ResponseBase

set cookies

Source code in view/dev.py
@VIEW_DEV.route('/cookie')
def set_cookie() -> ResponseBase:
    ''' set cookies '''
    session['sid'] = request.args['sid']
    return redirect(url_for('dev.index'))

view.expense

Expense

by_project_dl

by_project_dl(pid: str) -> str | ResponseBase

Project download

Source code in view/expense.py
@VIEW_EXPENSE.route('/<pid>/dl', methods=('GET', 'POST'))
def by_project_dl(pid: str) -> str | ResponseBase:
    ''' Project download '''
    project = Project.get(pid)

    if not project:
        return redirect('/')

    is_admin = Budget.is_admin(pid=pid, uid=g.user['account']['_id'])
    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        raws = Expense.dl_format(pid=pid)

        if not raws:
            return Response('', status=204)

        with io.StringIO() as files:
            csv_writer = csv.DictWriter(files, fieldnames=list(
                raws[0].keys()), quoting=csv.QUOTE_MINIMAL)
            csv_writer.writeheader()
            csv_writer.writerows(raws)

            filename = f"coscup_expense_{pid}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

            return Response(
                files.getvalue().encode(encoding="utf-8-sig"),
                mimetype='text/csv',
                headers={'Content-disposition': f'attachment; filename={filename}',
                         'x-filename': filename,
                         })

    return Response('', status=204)

by_project_index

by_project_index(pid: str) -> str | ResponseBase

Project index

Source code in view/expense.py
@VIEW_EXPENSE.route('/<pid>', methods=('GET', 'POST'))
def by_project_index(pid: str) -> str | ResponseBase:
    ''' Project index '''
    project = Project.get(pid)

    if not project:
        return redirect('/')

    is_admin = Budget.is_admin(pid=pid, uid=g.user['account']['_id'])
    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        return render_template('./expense.html',
                               project=project.dict(by_alias=True), is_admin=is_admin)

    if request.method == 'POST':
        data = request.get_json()

        if data and data['casename'] == 'get':
            datas = list(Expense.get_all_by_pid(pid=pid))

            buids = set()
            uids = set()
            for expense in datas:
                buids.add(expense['request']['buid'])
                uids.add(expense['create_by'])

            budgets = {}
            if buids:
                for raw in Budget.get(buids=list(buids), pid=pid):
                    budgets[raw['_id']] = raw

            users = {}
            if uids:
                user_datas = User.get_info(uids=list(uids))
                for uid, value in user_datas.items():
                    users[uid] = {
                        'oauth': value['oauth'],
                        'profile': {'badge_name': value['profile']['badge_name']}, }

            return jsonify({'datas': datas, 'budgets': budgets, 'users': users,
                            'status': Expense.status()})

        if data and data['casename'] == 'update':
            # update invoices
            Expense.update_invoices(
                expense_id=data['data']['_id'], invoices=data['data']['invoices'])
            Expense.update_enable(
                expense_id=data['data']['_id'], enable=data['data']['enable'])
            result = Expense.update_status(
                expense_id=data['data']['_id'], status=data['data']['status'])

            return jsonify({'result': result})

    return make_response({}, 404)

view.guide

Guide

index

index() -> str

Index page

Source code in view/guide.py
@VIEW_GUIDE.route('/')
def index() -> str:
    ''' Index page '''
    return render_template('./guide_index.html')

Links

index

index() -> str

Index page

Source code in view/links.py
@VIEW_LINKS.route('/')
def index() -> str:
    ''' Index page '''
    return 'hi'
link_chat() -> ResponseBase

Link to chat

Source code in view/links.py
@VIEW_LINKS.route('/chat', methods=('POST', ))
def link_chat() -> ResponseBase:
    ''' Link to chat '''
    if request.method == 'POST':
        if MattermostLink.verify_save(request.form):
            return Response('認證成功 / Completed')

        return Response('認證失敗 / Fail')

    return Response('', status=404)

view.project

Project

index

index() -> str

Index page

Source code in view/project.py
@VIEW_PROJECT.route('/')
def index() -> str:
    ''' Index page '''
    projects = []
    datas = []
    for project in Project.all():
        datas.append(project.dict(by_alias=True))

    for data in datas:
        date = arrow.get(data['action_date'])
        data['action_date_str'] = f"{date.format('YYYY-MM-DD')} ({date.humanize(arrow.now())})"

    per = 3
    for i in range(int(math.ceil(len(datas) / float(per)))):
        projects.append(datas[per*i:min([per*(i+1), len(datas)])])

    return render_template('./project_index.html', projects=projects)

project_contact_book

project_contact_book(pid: str) -> str | ResponseBase

Project contact book

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/contact_book', methods=('GET', 'POST'))
def project_contact_book(pid: str) -> str | ResponseBase:
    ''' Project contact book '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project and request.method == 'GET':
        return render_template('./project_contact_book.html', project=project.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            all_users = {}
            for team in Team.list_by_pid(pid=pid):
                if team.chiefs:
                    for uid in team.chiefs:
                        all_users[uid] = {'tid': team.id}

                if team.members:
                    for uid in team.members:
                        all_users[uid] = {'tid': team.id}

            user_infos = User.get_info(
                uids=list(all_users.keys()), need_sensitive=True)

            mmt = MattermostTools(
                token=setting.MATTERMOST_BOT_TOKEN, base_url=setting.MATTERMOST_BASEURL)
            datas = []
            for uid, value in all_users.items():
                user_info = user_infos[uid]
                data = {
                    'uid': uid,
                    'name': user_info['profile']['badge_name'],
                    'picture': user_info['oauth']['picture'],
                    'tid': value['tid'],
                    'email': user_info['oauth']['email'],
                }

                if 'profile_real' in user_info:
                    data['phone'] = user_info['profile_real'].get('phone', '')

                data['user_name'] = mmt.find_user_name(
                    mmt.find_possible_mid(uid=uid))
                datas.append(data)

            return jsonify({'datas': datas})

    return make_response({}, 404)

project_dietary_habit

project_dietary_habit(pid: str) -> str | ResponseBase

Project dietary habit

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/dietary_habit', methods=('GET', 'POST'))
def project_dietary_habit(pid: str) -> str | ResponseBase:
    ''' Project dietary habit '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project and request.method == 'GET':
        return render_template('./project_dietary_habit.html',
                               project=project.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            all_users = {}
            for team in Team.list_by_pid(pid=pid):
                if team.chiefs:
                    for uid in team.chiefs:
                        all_users[uid] = {'tid': team.id}

                if team.members:
                    for uid in team.members:
                        all_users[uid] = {'tid': team.id}

            user_infos = User.get_info(
                uids=list(all_users.keys()), need_sensitive=True)

            datas = list(User.marshal_dietary_habit(user_infos=user_infos))
            for data in datas:
                if data['uid'] not in all_users:
                    continue

                data['tid'] = all_users[data['uid']]['tid']

            dietary_habit_list = {}
            for item in DietaryHabitItemsName:
                dietary_habit_list[DietaryHabitItemsValue[item.name].value] = item.value

            return jsonify({'datas': datas, 'dietary_habit': dietary_habit_list})

    return Response('', status=404)

project_edit

project_edit(pid: str) -> str | ResponseBase

Project edit

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/edit', methods=('GET', 'POST'))
def project_edit(pid: str) -> str | ResponseBase:
    ''' Project edit '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project and request.method == 'GET':
        return render_template('./project_edit.html', project=project.dict(by_alias=True))

    if request.method == 'POST':
        data: dict[str, Any] = dict(request.form)
        data['formswitch'] = FormsSwitch().dict()
        for key in data:
            if key.startswith('formswitch.'):
                item = key.split('.')[1]
                data['formswitch'][item] = True

        Project.update(pid, ProjectBaseUpdate.parse_obj(data))
        return redirect(url_for('project.project_edit', pid=pid, _scheme='https', _external=True))

    return Response('', status=404)

project_edit_create_team

project_edit_create_team(pid: str) -> str | ResponseBase

Project edit create team

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/edit/team', methods=('GET', 'POST'))
def project_edit_create_team(pid: str) -> str | ResponseBase:
    ''' Project edit create team '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project:
        teams = []
        for team in Team.list_by_pid(project.id, show_all=True):
            teams.append(team.dict(by_alias=True))

        return render_template('./project_edit_create_team.html',
                               project=project.dict(by_alias=True), teams=teams)

    return Response('', status=404)

project_edit_create_team_api

project_edit_create_team_api(pid: str) -> ResponseBase

Project edit create team API

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/edit/team/api', methods=('GET', 'POST'))
def project_edit_create_team_api(pid: str) -> ResponseBase:  # pylint: disable=too-many-branches
    ''' Project edit create team API '''
    project = Project.get(pid)
    if not project or not project.owners or g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if request.method == 'GET':
        _data = Team.get(pid, request.args['tid'].strip())
        if _data is not None:
            _team = _data.dict(by_alias=True)

            team = {}
            for k in ('name', 'chiefs', 'members', 'owners', 'tid',
                      'headcount', 'mailling', 'disabled'):
                if k in _team:
                    team[k] = _team[k]

            if 'headcount' not in team or team['headcount'] is None:
                team['headcount'] = 0
            else:
                team['headcount'] = max([0, int(team['headcount'])])

            return jsonify(team)

    if request.method == 'POST':
        data = request.json
        if data and data['submittype'] == 'update':
            chiefs = data['chiefs']
            members = data['members']
            if isinstance(data['chiefs'], str):
                chiefs = [_uid.strip()
                          for _uid in data['chiefs'].split(',') if _uid.strip()]

            if isinstance(data['members'], str):
                members = [_uid.strip()
                           for _uid in data['members'].split(',') if _uid.strip()]

            if chiefs is None:
                chiefs = []

            if members is None:
                members = []

            new_members = set(chiefs + members)
            old_members = set(Team.get_users(
                pid=pid, tids=[data['tid'], ])[data['tid']])

            TeamMemberChangedDB().make_record(pid=pid, tid=data['tid'],
                                              action={'add': list(new_members-old_members),
                                                      'del': list(old_members-new_members)})

            Team.update_setting(pid=pid, tid=data['tid'], data=data)
            service_sync_mattermost_add_channel.apply_async(
                kwargs={'pid': pid, 'uids': list(new_members)})

            return Response(f'{data}')

        if data and data['submittype'] == 'create':
            Team.create(
                pid=pid, tid=data['tid'], name=data['name'], owners=project.owners)
            return Response(f'{data}')

    return Response('', status=404)

project_form

project_form(pid: str) -> str | ResponseBase

Project form

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/form', methods=('GET', 'POST'))
def project_form(pid: str) -> str | ResponseBase:
    ''' Project form '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project and request.method == 'GET':
        return render_template('./project_form.html', project=project.dict(by_alias=True))

    return Response('', status=404)

project_form_accommodation

project_form_accommodation(pid: str) -> str | ResponseBase

Project form accommodation

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/form/accommodation', methods=('GET', 'POST'))
def project_form_accommodation(pid: str) -> str | ResponseBase:  # pylint: disable=too-many-branches
    ''' Project form accommodation '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project and request.method == 'GET':
        return render_template('./project_form_accommodation.html',
                               project=project.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            all_users = {}
            for team in Team.list_by_pid(pid=pid):
                if team.chiefs:
                    for uid in team.chiefs:
                        all_users[uid] = {'tid': team.id}

                if team.members:
                    for uid in team.members:
                        all_users[uid] = {'tid': team.id}

            raws = []
            for raw in FormAccommodation.get(pid):
                if raw['uid'] not in all_users:
                    continue

                raws.append(raw)

            user_infos = User.get_info(
                uids=[raw['uid'] for raw in raws], need_sensitive=True)

            datas = []
            for raw in raws:
                user_info = user_infos[raw['uid']]
                datas.append({
                    'uid': raw['uid'],
                    'name': user_info['profile']['badge_name'],
                    'picture': user_info['oauth']['picture'],
                    'roc_id': user_info['profile_real']['roc_id'],
                    'tid': all_users[raw['uid']]['tid'],
                    'room': raw['data'].get('room', ''),
                    'room_key': raw['data'].get('room_key', ''),
                    'data': raw['data'],
                })

            return jsonify({'datas': datas})

        if post_data and post_data['casename'] == 'update':
            for data in post_data['datas']:
                logging.info('uid: %s, room: %s',
                             data['uid'].strip(), data['room'].strip())
                FormAccommodation.update_room(
                    pid=pid, uid=data['uid'].strip(), room=data['room'].strip())

            return jsonify({})

    return make_response({}, 404)

project_form_api

project_form_api(pid: str) -> str | ResponseBase

Project form API

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/form/api', methods=('GET', 'POST'))
def project_form_api(pid: str) -> str | ResponseBase:  # pylint: disable=too-many-locals
    ''' Project form API '''
    # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if request.method == 'POST':
        data = request.get_json()
        if data and 'case' not in data:
            return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

        if data and data['case'] == 'volunteer_certificate':
            fieldnames = ('uid', 'picture', 'value', 'name',
                          'roc_id', 'birthday', 'company')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(str_io, fieldnames=fieldnames)
                csv_writer.writeheader()

                row_data: Mapping[Literal['uid', 'picture', 'value',
                                          'name', 'roc_id', 'birthday',
                                          'company'], str] | Mapping[str, Any]
                for raw in Form.all_volunteer_certificate(pid):
                    user_info = UsersDB().find_one({'_id': raw['uid']})
                    if not user_info:
                        continue

                    oauth = OAuthDB().find_one(
                        {'owner': raw['uid']}, {'data.picture': 1})
                    if not oauth:
                        continue

                    row_data = {
                        'uid': raw['uid'],
                        'picture': oauth['data']['picture'],
                        'value': raw['data']['value'],
                        'name': user_info['profile_real']['name'],
                        'roc_id': user_info['profile_real']['roc_id'],
                        'birthday': user_info['profile_real']['birthday'],
                        'company': user_info['profile_real']['company'],
                    }

                    csv_writer.writerow(row_data)

                result = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result.append(raw_read)

                return jsonify({'result': result})

        if data and data['case'] == 'traffic_fee':
            fieldnames = ('uid', 'picture', 'name', 'apply',
                          'fee', 'fromwhere', 'howto')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(str_io, fieldnames=fieldnames)
                csv_writer.writeheader()

                row_data_traffic: Mapping[Literal['uid', 'picture', 'name',
                                          'apply', 'fee', 'fromwhere', 'howto'],
                                          str] | Mapping[str, Any]
                for raw in Form.all_traffic_fee(pid):
                    user_info = User.get_info(uids=[raw['uid'], ])[raw['uid']]
                    if not user_info:
                        continue

                    row_data_traffic = {
                        'uid': raw['uid'],
                        'picture': user_info['oauth']['picture'],
                        'name': user_info['profile']['badge_name'],
                        'apply': raw['data']['apply'],
                        'fee': raw['data']['fee'],
                        'fromwhere': raw['data']['fromwhere'],
                        'howto': raw['data']['howto'],
                    }

                    csv_writer.writerow(row_data_traffic)

                result = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result.append(raw_read)

                return jsonify({'result': result})

        if data and data['case'] == 'accommodation':
            fieldnames_acc = ('uid', 'picture', 'name', 'key', 'status')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(str_io, fieldnames=fieldnames_acc)
                csv_writer.writeheader()

                row_data_acc: Mapping[Literal['uid',
                                              'picture', 'name', 'key',
                                              'status'], str] | Mapping[str, Any]
                for raw in Form.all_accommodation(pid):
                    user_info = User.get_info(uids=[raw['uid'], ])[raw['uid']]

                    if not user_info:
                        continue

                    row_data_acc = {
                        'uid': raw['uid'],
                        'picture': user_info['oauth']['picture'],
                        'name': user_info['profile']['badge_name'],
                        'key': raw['data']['key'],
                        'status': raw['data']['status'],
                    }

                    csv_writer.writerow(row_data_acc)

                result = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result.append(raw_read)

                return jsonify({'result': result})

        if data and data['case'] == 'appreciation':
            fieldnames_app = ('uid', 'picture', 'name',
                              'available', 'key', 'value')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(str_io, fieldnames=fieldnames_app)
                csv_writer.writeheader()

                row_data_app: Mapping[Literal['uid', 'picture',
                                              'name', 'available',
                                              'key', 'value'], str] | Mapping[str, Any]
                for raw in Form.all_appreciation(pid):
                    if not raw['data']['available']:
                        continue

                    user_info = User.get_info(uids=[raw['uid'], ])[raw['uid']]

                    if not user_info:
                        continue

                    row_data_app = {
                        'uid': raw['uid'],
                        'picture': user_info['oauth']['picture'],
                        'name': user_info['profile']['badge_name'],
                        'available': raw['data']['available'],
                        'key': raw['data']['key'],
                        'value': raw['data']['value'],
                    }
                    csv_writer.writerow(row_data_app)

                result_str = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result_str.append(raw_read)

                return jsonify({'result': result_str})

        if data and data['case'] == 'clothes':
            all_users = {}
            for team in Team.list_by_pid(pid=pid):
                if team.chiefs:
                    for uid in team.chiefs:
                        all_users[uid] = {'tid': team.id}

                if team.members:
                    for uid in team.members:
                        all_users[uid] = {'tid': team.id}

            user_info = User.get_info(uids=list(all_users.keys()))

            fieldnames = ('uid', 'picture', 'name',
                          '_has_data', 'tid', 'clothes', 'htg')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(str_io, fieldnames=fieldnames)
                csv_writer.writeheader()

                row_data_clothes: Mapping[Literal['uid', 'picture',
                                                  'name', '_has_data',
                                                  'tid', 'clothes', 'htg'], str] | Mapping[str, Any]
                for raw in Form.all_clothes(pid):
                    if raw['uid'] not in all_users:
                        continue

                    all_users[raw['uid']]['clothes'] = raw['data']['clothes']

                    if 'htg' in raw['data']:
                        all_users[raw['uid']]['htg'] = raw['data']['htg']

                for uid, value in all_users.items():
                    row_data_clothes = {
                        'uid': uid,
                        'picture': user_info[uid]['oauth']['picture'],
                        'name': user_info[uid]['profile']['badge_name'],
                        '_has_data': bool(value.get('clothes', False)),
                        'tid': value['tid'],
                        'clothes': value.get('clothes'),
                        'htg': value.get('htg'),
                    }
                    csv_writer.writerow(row_data_clothes)

                result = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result.append(raw_read)

                return jsonify({'result': result})

        if data and data['case'] == 'parking_card':
            fieldnames_parking = ('uid', 'picture', 'name', 'carno', 'dates')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(
                    str_io, fieldnames=fieldnames_parking)
                csv_writer.writeheader()

                row_data_parking: Mapping[Literal['uid', 'picture',
                                                  'name', 'carno',
                                                  'dates'], str] | Mapping[str, Any]
                for raw in Form.all_parking_card(pid):
                    if not raw['data']['dates']:
                        continue

                    user_info = User.get_info(uids=[raw['uid'], ])[raw['uid']]

                    if not user_info:
                        continue

                    row_data_parking = {
                        'uid': raw['uid'],
                        'picture': user_info['oauth']['picture'],
                        'name': user_info['profile']['badge_name'],
                        'carno': raw['data']['carno'],
                        'dates': ', '.join(raw['data']['dates']),
                    }
                    csv_writer.writerow(row_data_parking)

                result = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result.append(raw_read)

                return jsonify({'result': result})

        if data and data['case'] == 'drink':
            all_users = {}
            for team in Team.list_by_pid(pid=pid):
                if team.chiefs:
                    for uid in team.chiefs:
                        all_users[uid] = {'tid': team.id}

                if team.members:
                    for uid in team.members:
                        all_users[uid] = {'tid': team.id}

            user_info = User.get_info(uids=list(all_users.keys()))

            fieldnames_drink = ('uid', 'picture', 'name',
                                '_has_data', 'tid', 'y18')
            with io.StringIO() as str_io:
                csv_writer = csv.DictWriter(
                    str_io, fieldnames=fieldnames_drink)
                csv_writer.writeheader()

                row_data_drink: Mapping[Literal['uid', 'picture', 'name',
                                                '_has_data', 'tid', 'y18'], str] | Mapping[str, Any]
                for raw in Form.all_drink(pid):
                    if raw['uid'] not in all_users:
                        continue

                    all_users[raw['uid']]['y18'] = raw['data']['y18']

                for uid, value in all_users.items():
                    row_data_drink = {
                        'uid': uid,
                        'picture': user_info[uid]['oauth']['picture'],
                        'name': user_info[uid]['profile']['badge_name'],
                        '_has_data': bool(value.get('y18')),
                        'tid': value['tid'],
                        'y18': value.get('y18'),
                    }
                    csv_writer.writerow(row_data_drink)

                result = []
                for raw_read in csv.reader(io.StringIO(str_io.getvalue())):
                    result.append(raw_read)

                return jsonify({'result': result})

    return make_response({}, 404)

project_form_traffic_mapping

project_form_traffic_mapping(
    pid: str,
) -> str | ResponseBase

Project form traffic mapping

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/form_traffic_mapping', methods=('GET', 'POST'))
def project_form_traffic_mapping(pid: str) -> str | ResponseBase:
    ''' Project form traffic mapping '''
    project = Project.get(pid)
    if project and g.user['account']['_id'] not in project.owners:
        return redirect(url_for('project.team_page', pid=pid, _scheme='https', _external=True))

    if project and request.method == 'GET':
        return render_template('./project_form_traffic_mapping.html',
                               project=project.dict(by_alias=True))

    if request.method == 'POST':
        data = request.get_json()
        if data and 'casename' in data and data['casename'] == 'init':
            return jsonify({
                'base': ProjectTrafficLocationFeeItem().dict(),
                'data': {item.location: item.fee for item in FormTrafficFeeMapping.get(pid=pid)},
            })

        if data and 'casename' in data and data['casename'] == 'save':
            feemapping = []
            for raw in data['data']:
                if raw['location'].strip():
                    feemapping.append(
                        ProjectTrafficLocationFeeItem.parse_obj(raw))

            saved = FormTrafficFeeMapping.save(pid=pid, datas=feemapping)
            result = {}
            for item in saved:
                result[item.location] = item.fee

            return jsonify({'data': result})

    return Response('', status=404)

team_page

team_page(pid: str) -> str | ResponseBase

Team page

Source code in view/project.py
@VIEW_PROJECT.route('/<pid>/')
def team_page(pid: str) -> str | ResponseBase:
    ''' Team page '''
    teams = []
    project = Project.get(pid)
    if not project:
        return Response('no data', status=404)

    data = [team.dict(by_alias=True) for team in Team.list_by_pid(project.id)]
    uids = []
    for team in data:
        if team['chiefs']:
            uids.extend(team['chiefs'])

    total = 0
    user_info = User.get_info(uids)
    for team in data:
        team['chiefs_name'] = []
        if team['chiefs']:
            for uid in team['chiefs']:
                team['chiefs_name'].append(
                    f'''<a href="/user/{uid}">{user_info[uid]['profile']['badge_name']}</a>''')

        team_uids = set()
        if team['chiefs']:
            team_uids.update(team['chiefs'])
        if team['members']:
            team_uids.update(team['members'])

        team['count'] = len(team_uids)
        total += team['count']

    # ----- group for layout ----- #
    per = 3
    for i in range(int(math.ceil(len(data) / float(per)))):
        teams.append(data[per*i:min([per*(i+1), len(data)])])

    editable = g.user['account']['_id'] in project.owners

    return render_template('./project_teams_index.html',
                           teams=teams,
                           project=project.dict(by_alias=True),
                           editable=editable,
                           total=total,
                           )

view.recruit

recruit.py

recurit_list

recurit_list(pid: str, tid: str) -> str | ResponseBase

List page

Source code in view/recruit.py
@VIEW_RECRUIT.route('/<pid>/<tid>/list', methods=('GET', 'POST'))
def recurit_list(pid: str, tid: str) -> str | ResponseBase:  # pylint: disable=too-many-return-statements
    ''' List page '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)

    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        return render_template('./recruit_list.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True), is_admin=is_admin)

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            return jsonify({
                'team_enum': {key: item.value for key, item in TeamsEnum.__members__.items()},
                'team_enum_desc': {key: item.value for key, item in
                                   TeamsEnumDesc.__members__.items()},
                'skill_enum': {key: item.value for key, item in
                               SkillEnum.__members__.items()},
                'skill_enum_desc': {key: item.value for key, item in
                                    SkillEnumDesc.__members__.items()},
                'status_enum': {key: item.value for key, item in StatusEnum.__members__.items()},
                'status_enum_desc': {key: item.value for key, item in
                                     StatusEnumDesc.__members__.items()},
            })

        if post_data and post_data['casename'] == 'query':
            query = RecruitQuery.parse_obj(post_data['query']).dict()
            data = list(TobeVolunteer.query(query))

            users_info = User.get_info(uids=[user['uid'] for user in data])

            suspend_uids: dict[str, None] = {}
            for raw_user in User.get_suspend_uids(uids=[user['uid'] for user in data]):
                suspend_uids[raw_user['_id']] = None

            result = []
            for member in data:
                if member['uid'] in suspend_uids:
                    continue

                member.update({
                    'profile': {'badge_name': users_info[member['uid']]['profile']['badge_name']},
                    'oauth': {'picture': users_info[member['uid']]['oauth']['picture']}})

                result.append(member)

            return jsonify({'members': result})

    return jsonify({})

view.sender

Sneder

campaign

campaign(
    pid: str, tid: str, cid: str
) -> str | ResponseBase

campaign

Source code in view/sender.py
@VIEW_SENDER.route('/<pid>/<tid>/campaign/<cid>/', methods=('GET', 'POST'))
def campaign(pid: str, tid: str, cid: str) -> str | ResponseBase:
    ''' campaign '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    campaign_data = SenderCampaign.get(cid=cid, pid=pid, tid=tid)

    return render_template('./sender_campaign_index.html',
                           campaign=campaign_data, team=team.dict(by_alias=True))

campaign_content

campaign_content(
    pid: str, tid: str, cid: str
) -> str | ResponseBase

Campaign content

Source code in view/sender.py
@VIEW_SENDER.route('/<pid>/<tid>/campaign/<cid>/content', methods=('GET', 'POST'))
def campaign_content(pid: str, tid: str, cid: str) -> str | ResponseBase:  # pylint: disable=too-many-return-statements
    ''' Campaign content '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        campaign_data = SenderCampaign.get(
            cid=cid, pid=team.pid, tid=team.id)

        return render_template('./sender_campaign_content.html',
                               campaign=campaign_data, team=team.dict(by_alias=True))

    if request.method == 'POST':
        data = request.get_json()

        if data and 'casename' in data and data['casename'] == 'get':
            campaign_data = SenderCampaign.get(
                cid=cid, pid=team.pid, tid=team.id)

            if campaign_data:
                return jsonify({'mail': campaign_data['mail']})

        if data and 'casename' in data and data['casename'] == 'save':
            resp = SenderCampaign.save_mail(
                cid=cid,
                subject=data['data']['subject'].strip(),
                content=data['data']['content'].strip(),
                preheader=data['data']['preheader'].strip(),
                layout=data['data']['layout'].strip(),
            )
            return jsonify({'mail': resp['mail']})

    return make_response({}, 404)

campaign_receiver

campaign_receiver(
    pid: str, tid: str, cid: str
) -> str | ResponseBase

campaign receiver

Source code in view/sender.py
@VIEW_SENDER.route('/<pid>/<tid>/campaign/<cid>/receiver', methods=('GET', 'POST'))
def campaign_receiver(pid: str, tid: str, cid: str) -> str | ResponseBase:
    ''' campaign receiver '''
    # pylint: disable=too-many-branches,too-many-locals,too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    campaign_data = SenderCampaign.get(cid=cid, pid=team.pid, tid=team.id)
    if request.method == 'GET':
        return render_template('./sender_campaign_receiver.html',
                               campaign=campaign_data, team=team.dict(by_alias=True))

    if request.method == 'POST':  # pylint: disable=too-many-nested-blocks
        if request.is_json:
            data = request.get_json()

            if data and 'casename' in data and data['casename'] == 'getinit':
                teams = []
                for _team in Team.list_by_pid(pid=team.pid):
                    teams.append({'tid': _team.id, 'name': _team.name})

                team_w_tags = []
                if team.tag_members:
                    team_w_tags = [t_m.dict() for t_m in team.tag_members]

                sender_receiver = SenderReceiver.get(pid=team.pid, cid=cid)

                picktags = []
                if campaign_data and 'team_w_tags' in campaign_data['receiver'] and \
                        team.id in campaign_data['receiver']['team_w_tags']:
                    picktags = campaign_data['receiver']['team_w_tags'][team.id]

                if campaign_data:
                    return jsonify({'teams': teams,
                                    'team_w_tags': team_w_tags,
                                    'pickteams': campaign_data['receiver']['teams'],
                                    'picktags': picktags,
                                    'is_all_users': campaign_data['receiver']['all_users'],
                                    'all_users_count': User.count(),
                                    'filedata': sender_receiver,
                                    })

            if data and 'casename' in data and data['casename'] == 'save':
                tids = [team.id for team in Team.list_by_pid(pid=team.pid)]

                _result = []
                for tid_info in tids:
                    if tid_info in data['pickteams']:
                        _result.append(tid_info)

                _team_w_tags = []
                if team.tag_members:
                    for tag in team.tag_members:
                        if tag.id in data['picktags']:
                            _team_w_tags.append(tag.id)

                return jsonify(SenderCampaign.save_receiver(
                    cid=cid, teams=_result, team_w_tags={
                        team.id: _team_w_tags},
                    all_users=bool(data['is_all_users']))['receiver'])

        if request.form['uploadtype'] == 'remove':
            SenderReceiver.remove(pid=team.pid, cid=cid)

            return jsonify({'file': [],
                            'uploadtype': f"{html.escape(request.form['uploadtype'])}",
                            })

        if request.files and 'file' in request.files:
            csv_file = list(csv.DictReader(io.StringIO(
                request.files['file'].read().decode('utf8'))))
            if request.form['uploadtype'] == 'replace':
                SenderReceiver.replace(
                    pid=team.pid, cid=cid, datas=csv_file)
            elif request.form['uploadtype'] == 'update':
                SenderReceiver.update(pid=team.pid, cid=cid, datas=csv_file)

            return jsonify({'file': csv_file,
                            'uploadtype': f"{html.escape(request.form['uploadtype'])}",
                            })

    return make_response({}, 404)

campaign_schedule

campaign_schedule(
    pid: str, tid: str, cid: str
) -> str | ResponseBase

campaign schedule

Source code in view/sender.py
@VIEW_SENDER.route('/<pid>/<tid>/campaign/<cid>/schedule', methods=('GET', 'POST'))
def campaign_schedule(pid: str, tid: str, cid: str) -> str | ResponseBase:
    ''' campaign schedule '''
    # pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    campaign_data = SenderCampaign.get(
        cid=cid, pid=team.pid, tid=team.id)
    if request.method == 'GET':
        return render_template('./sender_campaign_schedule.html',
                               campaign=campaign_data, team=team.dict(by_alias=True))

    if request.method == 'POST':
        data = request.get_json()

        if data and 'casename' in data and data['casename'] == 'getlogs':
            logs = []
            for log in SenderLogs.get(cid=cid):
                logs.append({
                    'time': arrow.get(log['create_at']).to(
                            'Asia/Taipei').format('YYYY-MM-DD HH:mm:ss'),
                    'cid': log['cid'],
                    'count': len(log['receivers']),
                    'layout': log['layout'],
                    'desc': log['desc'],
                })

            return jsonify({'logs': logs})

        if campaign_data and data and 'casename' in data and data['casename'] == 'send':
            user_datas = []

            fields_user, raws_user = SenderReceiver.get_from_user(
                pid=team.pid, tids=campaign_data['receiver']['teams'])
            for raw in raws_user:
                user_datas.append(dict(zip(fields_user, raw)))

            fields, raws = SenderReceiver.get(pid=team.pid, cid=cid)
            for raw in raws:
                user_datas.append(dict(zip(fields, raw)))

            if 'team_w_tags' in campaign_data['receiver'] and \
                    team.id in campaign_data['receiver']['team_w_tags'] and \
                    campaign_data['receiver']['team_w_tags'][team.id]:
                fields_tag, raws_tag = SenderReceiver.get_by_tags(
                    pid=team.pid, tid=team.id,
                    tags=campaign_data['receiver']['team_w_tags'][team.id])

                for raw_tuple in raws_tag:
                    user_datas.append(dict(zip(fields_tag, raw_tuple)))

            if campaign_data['receiver']['all_users']:
                fields_all, raws_all = SenderReceiver.get_all_users()
                for raw_tuple in raws_all:
                    user_datas.append(dict(zip(fields_all, raw_tuple)))

            dist_user_datas = []
            emails = set()
            for user_data in user_datas:
                if user_data['mail'] not in emails:
                    emails.add(user_data['mail'])
                    dist_user_datas.append(user_data)

            SenderLogs.save(cid=cid,
                            layout=campaign_data['mail']['layout'],
                            desc='Send', receivers=dist_user_datas)

            source = None
            if campaign_data['mail']['layout'] == '2':
                if team.mailling:
                    source = {'name': team.name, 'mail': team.mailling}
                    if not (source['name'].startswith('COSCUP') or
                            source['name'].startswith('coscup')):
                        source['name'] = f"COSCUP {source['name']}"
                else:
                    source = {'name': 'COSCUP Attendee',
                              'mail': 'attendee@coscup.org'}

            sender_mailer_start.apply_async(kwargs={
                'campaign_data': campaign_data, 'team_name': team.name, 'source': source,
                'user_datas': dist_user_datas, 'layout': campaign_data['mail']['layout']})

            return jsonify({})

        if campaign_data and data and 'casename' in data and data['casename'] == 'sendtest':
            # layout, campaign_data, team, uids
            user_datas = []

            fields_user, raws_user = SenderReceiver.get_from_user(
                pid=team.pid, tids=campaign_data['receiver']['teams'])
            if raws_user:
                user_datas.append(
                    dict(zip(fields_user, random.choice(raws_user))))

            fields, raws = SenderReceiver.get(pid=team.pid, cid=cid)
            if raws:
                user_datas.append(dict(zip(fields, random.choice(raws))))

            if 'team_w_tags' in campaign_data['receiver'] and \
                    team.id in campaign_data['receiver']['team_w_tags'] and \
                    campaign_data['receiver']['team_w_tags'][team.id]:
                fields_tag, raws_tag = SenderReceiver.get_by_tags(
                    pid=team.pid, tid=team.id,
                    tags=campaign_data['receiver']['team_w_tags'][team.id])
                if raws_tag:
                    user_datas.append(
                        dict(zip(fields_tag, random.choice(raws_tag))))

            if campaign_data['receiver']['all_users']:
                fields_all, raws_all = SenderReceiver.get_all_users()
                if raws_all:
                    user_datas.append(
                        dict(zip(fields_all, random.choice(raws_all))))

            uid = g.user['account']['_id']
            users = User.get_info(uids=[uid, ])

            dist_user_datas = [random.choice(user_datas), ]

            for user_data in dist_user_datas:
                user_data.update({
                    'mail': users[uid]['oauth']['email'],
                })

            SenderLogs.save(cid=cid,
                            layout=campaign_data['mail']['layout'],
                            desc='Test Send', receivers=dist_user_datas)

            source = None
            if campaign_data['mail']['layout'] == '2':
                if team.mailling:
                    source = {'name': team.name, 'mail': team.mailling}
                    if not (source['name'].startswith('COSCUP') or
                            source['name'].startswith('coscup')):
                        source['name'] = f"COSCUP {source['name']}"
                else:
                    source = {'name': 'COSCUP Attendee',
                              'mail': 'attendee@coscup.org'}

            sender_mailer_start.apply_async(kwargs={
                'campaign_data': campaign_data, 'team_name': team.name, 'source': source,
                'user_datas': dist_user_datas, 'layout': campaign_data['mail']['layout']})

            return jsonify({})

    return make_response({}, 404)

index

index(pid: str, tid: str) -> str | ResponseBase

Index page

Source code in view/sender.py
@VIEW_SENDER.route('/<pid>/<tid>/', methods=('GET', 'POST'))
def index(pid: str, tid: str) -> str | ResponseBase:  # pylint: disable=too-many-return-statements
    ''' Index page '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    # ..note::
    # 1. create campaign (campaign name) / list campaign
    # 2. write title, content(jinja2, markdown)
    # 3. pick up receiver
    # 4. send / send test

    if request.method == 'GET':
        return render_template('./sender.html')

    if request.method == 'POST':
        data = request.get_json()

        if data and 'casename' in data and data['casename'] == 'get':
            campaigns = list(SenderCampaign.get_list(
                pid=team.pid, tid=team.id))
            raw_users_info = User.get_info(
                uids=[c['created']['uid'] for c in campaigns])
            users_info = {}
            for uid, value in raw_users_info.items():
                users_info[uid] = {
                    'uid': uid, 'name': value['profile']['badge_name']}

            return jsonify({'campaigns': campaigns, 'users_info': users_info})

        if data and 'casename' in data and data['casename'] == 'create':
            resp = SenderCampaign.create(
                name=data['name'], pid=team.pid, tid=team.id, uid=g.user['account']['_id'])

            return jsonify({'cid': resp['_id']})

    return jsonify({})

view.setting

setting

api_token

api_token() -> str | ResponseBase

API Token

Source code in view/setting.py
@VIEW_SETTING.route('/api_token', methods=('GET', 'POST'))
def api_token() -> str | ResponseBase:
    ''' API Token'''
    if request.method == 'GET':
        return render_template('./setting_api_token.html')

    if request.method == 'POST':
        data = request.get_json()
        if data and data['casename'] == 'get':
            temp_account = APITokenTemp(uid=g.user['account']['_id'])
            APIToken.save_temp(data=temp_account)

            return jsonify({'temp_account': {
                'username': temp_account.username,
                'password': temp_account.password,
            }})

    return make_response({}, 404)

index

index() -> str

Index page

Source code in view/setting.py
@VIEW_SETTING.route('/')
def index() -> str:
    ''' Index page '''
    return render_template('./setting_index.html')
link_chat() -> str | ResponseBase

Link to chat

Source code in view/setting.py
@VIEW_SETTING.route('/link/chat', methods=('GET', 'POST'))
def link_chat() -> str | ResponseBase:
    ''' Link to chat '''
    if request.method == 'GET':
        mml = MattermostLink(uid=g.user['account']['_id'])
        return render_template('./setting_link_chat.html', mml=mml)

    if request.method == 'POST':
        data = request.get_json()
        if data and 'casename' in data:
            if data['casename'] == 'invite':
                service_sync_mattermost_invite.apply_async(
                    kwargs={'uids': (g.user['account']['_id'], )})
                return jsonify({})
        else:
            MattermostLink.reset(uid=g.user['account']['_id'])
            return redirect(url_for('setting.link_chat', _scheme='https', _external=True))

    return Response('', status=404)
link_telegram() -> str | ResponseBase

Link to Telegram

Source code in view/setting.py
@VIEW_SETTING.route('/link/telegram', methods=('GET', 'POST'))
def link_telegram() -> str | ResponseBase:
    ''' Link to Telegram '''
    if request.method == 'GET':
        telegram_data = []
        for tg_info in TelegramDB().find({'uid': g.user['account']['_id']}):
            tg_info['added'] = arrow.get(tg_info['added']).isoformat()
            telegram_data.append(tg_info)

        return render_template('./setting_link_telegram.html',
                               telegram_data=json.dumps(telegram_data))

    if request.method == 'POST':
        data = request.get_json()
        if data and 'casename' in data and data['casename'] == 'del_account':
            TelegramDB().delete_many({'uid': g.user['account']['_id']})

        return jsonify({})

    return make_response({}, 404)

mails

mails() -> str | ResponseBase

about mails

Source code in view/setting.py
@VIEW_SETTING.route('/mails', methods=('GET', 'POST'))
def mails() -> str | ResponseBase:
    ''' about mails '''
    if request.method == 'GET':
        return render_template('./setting_mails.html')

    if request.method == 'POST':
        mail_member_welcome_send.apply_async(kwargs={
            'uids': [g.user['account']['_id'], ]})

        return jsonify({})

    return make_response({}, 404)

profile_page

profile_page() -> str | ResponseBase

profile

Source code in view/setting.py
@VIEW_SETTING.route('/profile', methods=('GET', 'POST'))
def profile_page() -> str | ResponseBase:
    ''' profile '''
    # pylint: disable=too-many-branches
    if request.method == 'GET':
        user = g.user['account']
        if 'profile' not in user:
            user['profile'] = UserProfle().dict()

        return render_template('./setting_profile.html', user=user)

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            user = g.user['account']
            if 'profile' not in user:
                user['profile'] = UserProfle().dict()

            profile = {}
            if 'profile' in user:
                if 'badge_name' in user['profile'] and user['profile']['badge_name'].strip():
                    profile['badge_name'] = user['profile']['badge_name'].strip()

                profile['intro'] = user['profile'].get('intro', '')
                profile['id'] = user['_id']

            return jsonify({
                'profile': profile,
                'team_enum': {key: item.value for key, item in TeamsEnum.__members__.items()},
                'team_enum_desc': {key: item.value for key, item in
                                   TeamsEnumDesc.__members__.items()},
                'skill_enum': {key: item.value for key, item in
                               SkillEnum.__members__.items()},
                'skill_enum_desc': {key: item.value for key, item in
                                    SkillEnumDesc.__members__.items()},
                'status_enum': {key: item.value for key, item in StatusEnum.__members__.items()},
                'status_enum_desc': {key: item.value for key, item in
                                     StatusEnumDesc.__members__.items()},
            })

        if post_data and post_data['casename'] == 'get_tobe_volunteer':
            data = TobeVolunteer.get(uid=g.user['account']['_id'])

            return jsonify({'tobe_volunteer': data.dict()})

        if post_data and post_data['casename'] == 'save_tobe_volunteer':
            save_data = TobeVolunteerStruct.parse_obj(post_data['data']).dict()
            save_data['uid'] = g.user['account']['_id']
            TobeVolunteer.save(data=save_data)

            return jsonify({})

        if post_data and post_data['casename'] == 'save':
            User(uid=g.user['account']['_id']).update_profile(
                UserProfle.parse_obj(post_data['data']).dict()
            )
            MC.get_client().delete(f"sid:{session['sid']}")

        return jsonify({})

    return make_response({}, 404)

profile_real

profile_real() -> str | ResponseBase

Profile real

Source code in view/setting.py
@VIEW_SETTING.route('/profile_real', methods=('GET', 'POST'))
def profile_real() -> str | ResponseBase:
    ''' Profile real '''
    # pylint: disable=too-many-branches
    if request.method == 'GET':
        return render_template('./setting_profile_real.html')

    if request.method == 'POST':
        post_data = request.get_json()
        struct_user: UserProfleReal

        if post_data and post_data['casename'] == 'get':
            default_code: str = '886'

            if 'profile_real' in g.user['account']:
                user = {'profile_real': g.user['account']['profile_real']}

                struct_user = UserProfleReal.parse_obj(
                    g.user['account']['profile_real'])

                try:
                    phone = phonenumbers.parse(struct_user.phone, None)
                    struct_user.phone = phonenumbers.format_number(
                        phone, phonenumbers.PhoneNumberFormat.NATIONAL)
                    default_code = str(
                        phone.country_code) if phone.country_code else '886'

                except phonenumbers.phonenumberutil.NumberParseException:
                    pass
            else:
                struct_user = UserProfleReal()

            user = struct_user.dict()

            if 'birthday' in user and user['birthday']:
                user['birthday'] = user['birthday'].strftime(
                    '%Y-%m-%d')

            phone_codes = sorted(
                phonenumbers.COUNTRY_CODE_TO_REGION_CODE.items(), key=lambda x: x[1][0])

            dietary_habit_list = []
            for item in DietaryHabitItemsName:
                dietary_habit_list.append(
                    (DietaryHabitItemsValue[item.name].value, item.value))

            return jsonify({'profile': user,
                            'phone_codes': phone_codes,
                            'default_code': default_code,
                            'dietary_habit': dietary_habit_list,
                            })

        if post_data and post_data['casename'] == 'update':
            phone_str = ''
            if 'phone' in post_data['data'] and post_data['data']['phone'] and \
                    'phone_code' in post_data['data'] and post_data['data']['phone_code']:
                try:
                    phone_number = phonenumbers.parse(
                        f"+{post_data['data']['phone_code']} {post_data['data']['phone']}")
                    phone_str = phonenumbers.format_number(
                        phone_number, phonenumbers.PhoneNumberFormat.E164)
                except phonenumbers.phonenumberutil.NumberParseException:
                    phone_str = ''

            user_profile_real = UserProfleReal(
                name=post_data['data'].get('name'),
                phone=phone_str,
                roc_id=post_data['data'].get('roc_id'),
                company=post_data['data'].get('company')
            )

            try:
                birthday = arrow.get(post_data['data'].get(
                    'birthday', '').strip()).naive
            except (AttributeError, arrow.parser.ParserError):
                birthday = None

            user_profile_real.birthday = birthday

            user_profile_real.dietary_habit = valid_dietary_value(
                items_no=post_data['data']['dietary_habit'])

            user_profile_real.bank = UserBank.parse_obj(
                post_data['data']['bank'])

            user_profile_real.address = UserAddress.parse_obj(
                post_data['data']['address'])

            User(uid=g.user['account']['_id']).update_profile_real(
                user_profile_real.dict(exclude_none=True))
            MC.get_client().delete(f"sid:{session['sid']}")

            return jsonify(user_profile_real.dict(exclude_none=True))

    return make_response({}, 404)

security

security() -> str | ResponseBase

security

Source code in view/setting.py
@VIEW_SETTING.route('/security', methods=('GET', 'POST'))
def security() -> str | ResponseBase:
    ''' security '''
    if request.method == 'GET':
        _now = arrow.now()

        alive_session = []
        for raw in USession.get_alive(uid=g.user['account']['_id']):
            if 'ipinfo' in raw and 'timezone' in raw['ipinfo']:
                timezone = raw['ipinfo']['timezone']
            else:
                timezone = 'Asia/Taipei'

            _created_at = arrow.get(math.ceil(raw['created_at']))
            raw['iso_time'] = _created_at.to(timezone).isoformat()
            raw['time_hr'] = _created_at.humanize(_now)
            raw['is_me'] = raw['_id'] == session['sid']

            alive_session.append(raw)

        records = []
        for raw in USession.get_recently(uid=g.user['account']['_id']):
            if 'ipinfo' in raw and 'timezone' in raw['ipinfo']:
                timezone = raw['ipinfo']['timezone']
            else:
                timezone = 'Asia/Taipei'

            _created_at = arrow.get(math.ceil(raw['created_at']))
            raw['iso_time'] = _created_at.to(timezone).isoformat()
            raw['time_hr'] = _created_at.humanize(_now)

            records.append(raw)

        return render_template('./setting_security.html',
                               records=records, alive_session=alive_session)

    if request.method == 'POST':
        data = request.get_json()
        if data:
            USession.make_dead(sid=data['sid'], uid=g.user['account']['_id'])
            return jsonify({})

    return make_response({}, 404)

waitting

waitting() -> str

waiting, the waitting is typo

Source code in view/setting.py
@VIEW_SETTING.route('/waitting')
def waitting() -> str:
    ''' waiting, the `waitting` is typo '''
    waitting_lists = []

    for raw in WaitList.find_history(uid=g.user['account']['_id']):
        raw['hr_time'] = arrow.get(raw['_id'].generation_time).to(
            'Asia/Taipei').format('YYYY-MM-DD')

        if 'result' in raw:
            if raw['result'] == 'approval':
                raw['result'] = 'approved'
            elif raw['result'] == 'deny':
                raw['result'] = 'denid'
        else:
            raw['result'] = 'waitting'

        waitting_lists.append(raw)

    call_func_id: Callable[[
        dict[str, Any], ], Any] = lambda x: x['_id']
    waitting_lists = sorted(
        waitting_lists, key=call_func_id, reverse=True)

    return render_template('./setting_waitting.html', waitting_lists=waitting_lists)

view.tasks

Tasks

add

add(
    pid: str, task_id: str | None = None
) -> str | ResponseBase

Add

Source code in view/tasks.py
@VIEW_TASKS.route('/<pid>/add', methods=('GET', 'POST'))
@VIEW_TASKS.route('/<pid>/edit/<task_id>', methods=('GET', 'POST'))
def add(pid: str, task_id: str | None = None) -> str | ResponseBase:
    ''' Add '''
    # pylint: disable=too-many-return-statements,too-many-branches
    uid = g.get('user', {}).get('account', {}).get('_id')
    if not uid:
        return make_response({'info': 'Need login'}, 401)

    is_in_project = False
    for _ in Team.participate_in(uid=uid, pid=[pid, ]):
        is_in_project = True
        break

    if not is_in_project:
        return make_response({'info': 'Not in project'}, 401)

    project_info = Project.get(pid=pid)
    if not project_info:
        return Response('404', status=404)

    if request.method == 'GET':
        catelist = Tasks.get_cate(pid=pid)
        return render_template('./tasks_add.html', project=project_info.dict(by_alias=True),
                               catelist=catelist, task_id=task_id)

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'add':
            data = post_data['data']
            task_item = TaskItem(pid=pid, created_by=uid, desc=data['desc'])
            task_item.title = data['title']
            task_item.cate = data['cate']
            task_item.limit = data['limit']

            task_item.starttime = arrow.get(arrow.get(f"{data['date']} {data['starttime']}",
                                            tzinfo='Asia/Taipei').timestamp()).naive
            if 'endtime' in data and data['endtime']:
                task_item.endtime = arrow.get(arrow.get(f"{data['date']} {data['endtime']}",
                                              tzinfo='Asia/Taipei').timestamp()).naive

            if 'task_id' in post_data and post_data['task_id']:
                task_item.id = post_data['task_id']

            raw = Tasks.add(pid=pid,
                            body=task_item.dict(by_alias=True))

            if 'task_id' in post_data and not post_data['task_id']:
                mail_tasks_star.apply_async(
                    kwargs={'pid': pid, 'task_id': raw['_id']})

            return jsonify({'data': raw})

        if post_data and post_data['casename'] == 'del':
            data = Tasks.get_with_pid(pid=pid, _id=post_data['task_id'])
            if not data:
                return make_response({}, 404)

            if data['created_by'] == g.user['account']['_id']:
                Tasks.delete(pid=pid, _id=data['_id'])

        if task_id and post_data and post_data['casename'] == 'get':
            data = Tasks.get_with_pid(pid=pid, _id=task_id)
            if not data:
                return make_response({}, 404)

            starttime_task = arrow.get(data['starttime']).to('Asia/Taipei')
            data['date'] = starttime_task.format('YYYY-MM-DD')
            data['starttime'] = starttime_task.format('HH:mm')

            if data['endtime']:
                endtime_task = arrow.get(data['endtime']).to('Asia/Taipei')
                data['endtime'] = endtime_task.format('HH:mm')

            data['_is_creator'] = g.user['account']['_id'] == data['created_by']

            return jsonify({'data': data})

        return jsonify({})

    return jsonify({})

index

index() -> ResponseBase

Index page

Source code in view/tasks.py
@VIEW_TASKS.route('/')
def index() -> ResponseBase:
    ''' Index page '''
    return redirect(url_for('tasks.project', pid='2022', _scheme='https', _external=True))

project

project(pid: str) -> str | ResponseBase

Project

Source code in view/tasks.py
@VIEW_TASKS.route('/<pid>', methods=('GET', 'POST'))
def project(pid: str) -> str | ResponseBase:
    ''' Project '''
    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
    uid = g.get('user', {}).get('account', {}).get('_id')

    project_info = Project.get(pid=pid)
    if not project_info:
        return Response('404', status=404)

    is_in_project = False
    if uid:
        for _ in Team.participate_in(uid=uid, pid=[pid, ]):
            is_in_project = True
            break

    if request.method == 'GET':
        return render_template('./tasks_project.html', project=project_info.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()

        def page_args(data: dict[str, Any], uid: str) -> None:
            data['_uid'] = uid
            data['_joined'] = False
            data['_login'] = False
            data['_is_in_project'] = is_in_project

            if uid:
                data['_login'] = True

                if uid in data['people']:
                    data['_joined'] = True

        if post_data and post_data['casename'] == 'get':
            datas = []

            for data in Tasks.get_by_pid(pid=pid):
                page_args(data=data, uid=uid)
                datas.append(data)

            is_star = False
            if uid:
                is_star = TasksStar.status(pid, uid)['add']

            return jsonify({'datas': datas, 'is_in_project': is_in_project, 'is_star': is_star})

        if post_data and post_data['casename'] == 'star':
            if not uid:
                return make_response({'info': 'Need login'}, 401)

            result = TasksStar.toggle(pid=pid, uid=uid)

            return jsonify({'is_star': result['add']})

        if post_data and post_data['casename'] == 'join':
            if not uid:
                return make_response({'info': 'Need login'}, 401)

            data = Tasks.join(pid=pid, task_id=post_data['task_id'], uid=uid)
            page_args(data=data, uid=uid)

            return jsonify({'data': data})

        if post_data and post_data['casename'] == 'cancel':
            if not uid:
                return make_response({'info': 'Need login'}, 401)

            data = Tasks.cancel(pid=pid, task_id=post_data['task_id'], uid=uid)
            page_args(data=data, uid=uid)

            return jsonify({'data': data})

        if post_data and post_data['casename'] == 'cancel_user':
            if not uid:
                return make_response({'info': 'Need login'}, 401)

            if not is_in_project:
                return make_response({'info': 'Need as staff'}, 401)

            data = Tasks.cancel(
                pid=pid, task_id=post_data['task_id'], uid=post_data['uid'])
            page_args(data=data, uid=uid)

            return jsonify({'data': data})

        if post_data and post_data['casename'] == 'peoples':
            task_data = Tasks.get_with_pid(pid=pid, _id=post_data['task_id'])
            if not task_data:
                return make_response({}, 404)

            users_info = Tasks.get_peoples_info(
                pid=pid, task_id=post_data['task_id'])

            if not users_info:
                return make_response({}, 404)

            creator = {}
            if task_data:
                user_info = User.get_info(uids=[task_data['created_by'], ])
                creator['name'] = user_info[task_data['created_by']
                                            ]['profile']['badge_name']
                creator['uid'] = task_data['created_by']

                mid = MattermostTools.find_possible_mid(
                    uid=task_data['created_by'])
                if mid:
                    creator['mattermost_uid'] = MattermostTools.find_user_name(
                        mid=mid)

            if not is_in_project:
                return jsonify({'peoples': {}, 'creator': creator})

            peoples = {}
            for uid, user in users_info.items():
                peoples[uid] = {
                    'name': user['profile']['badge_name'],
                    'mail': user['oauth']['email'],
                    'picture': user['oauth']['picture'],
                    'mattermost_uid': None,
                }

                mid = MattermostTools.find_possible_mid(uid=uid)
                if mid:
                    peoples[uid]['mattermost_uid'] = MattermostTools.find_user_name(
                        mid=mid)

            return jsonify({'peoples': peoples, 'creator': creator})

    return make_response({}, 404)

read

read(pid: str, task_id: str) -> str | ResponseBase

Read

Source code in view/tasks.py
@VIEW_TASKS.route('/<pid>/r/<task_id>', methods=('GET', 'POST'))
def read(pid: str, task_id: str) -> str | ResponseBase:
    ''' Read '''
    project_info = Project.get(pid=pid)
    if not project_info:
        return Response('404', status=404)

    task = Tasks.get_with_pid(pid=pid, _id=task_id)
    if not task:
        return redirect(url_for('tasks.project', pid=pid, _scheme='https', _external=True))

    task['starttime'] = arrow.get(task['starttime']).to(
        'Asia/Taipei').format('YYYY-MM-DD HH:mm')
    if task['endtime']:
        task['endtime'] = arrow.get(task['endtime']).to(
            'Asia/Taipei').format('YYYY-MM-DD HH:mm')

    uid = g.get('user', {}).get('account', {}).get('_id')
    if request.method == 'GET':
        creator = {}
        user_info = User.get_info(uids=[task['created_by'], ])
        creator['name'] = user_info[task['created_by']]['profile']['badge_name']
        creator['uid'] = task['created_by']

        mid = MattermostTools.find_possible_mid(uid=task['created_by'])
        if mid:
            creator['mattermost_uid'] = MattermostTools.find_user_name(mid=mid)

        return render_template('./tasks_detail.html', task=task, creator=creator, uid=uid)

    if request.method == 'POST':
        if not uid:
            return make_response({'info': 'Need login'}, 401)

        post_data = request.get_json()
        if post_data and post_data['casename'] == 'join':
            Tasks.join(pid=pid, task_id=task['_id'], uid=uid)

        elif post_data and post_data['casename'] == 'cancel':
            Tasks.cancel(pid=pid, task_id=task['_id'], uid=uid)

        return jsonify({})
    return jsonify({})

view.team

Team

AppreciationData

Bases: BaseModel

AppreciationData

Source code in view/team.py
class AppreciationData(BaseModel):
    ''' AppreciationData '''
    available: bool = Field(default=False)
    key: str = Field(default='')
    value: str = Field(default='')

FeeMapping

Bases: BaseModel

FeeMapping

Source code in view/team.py
class FeeMapping(BaseModel):
    ''' FeeMapping '''
    fee: int
    howto: str
    apply: str
    fromwhere: str

UserInfoBase

Bases: BaseModel

UserInfoBase

Source code in view/team.py
class UserInfoBase(BaseModel):
    ''' UserInfoBase '''
    id: str = Field(alias='_id')
    profile: dict[str, str]
    oauth: dict[str, str]
    is_chief: bool = Field(default=False)
    chat: dict[str, str] | None = Field(default_factory=dict)

calendar

calendar(pid: str, tid: str) -> str | ResponseBase

calendar

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/calendar')
def calendar(pid: str, tid: str) -> str | ResponseBase:
    ''' calendar '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    if project.calendar:
        teamusers = TeamUsers.parse_obj(team)
        is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                    g.user['account']['_id'] in teamusers.owners or
                    g.user['account']['_id'] in project.owners)

        return render_template('./team_calendar.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True), is_admin=is_admin)

    return redirect(url_for('team.index',
                            pid=team.pid, tid=team.id, _scheme='https', _external=True))

index

index(pid: str, tid: str) -> str | ResponseBase

Index page

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/')
def index(pid: str, tid: str) -> str | ResponseBase:
    ''' Index page '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    for k in ('desc', 'public_desc'):
        if k not in team.__dict__ or not team.__dict__[k]:
            team.__dict__[k] = ''
        else:
            team.__dict__[k] = re.sub('<a href="javascript:.*"',
                                      '<a href="/"', markdown(html.escape(team.__dict__[k])))

    preview_public = False
    if 'preview' in request.args:
        preview_public = True

    teamusers = TeamUsers.parse_obj(team)
    join_able = not (g.user['account']['_id'] in teamusers.members or
                     g.user['account']['_id'] in teamusers.chiefs or
                     g.user['account']['_id'] in teamusers.owners or
                     g.user['account']['_id'] in project.owners)

    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    return render_template('./team_index.html', team=team.dict(by_alias=True),
                           project=project.dict(by_alias=True),
                           join_able=join_able, is_admin=is_admin,
                           preview_public=preview_public)

members

members(pid: str, tid: str) -> str | ResponseBase

members

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/members', methods=('GET', 'POST'))
def members(pid: str, tid: str) -> str | ResponseBase:  # pylint: disable=too-many-branches
    ''' members '''
    # pylint: disable=too-many-locals
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if request.method == 'GET':
        return render_template('./team_members.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True), is_admin=is_admin)

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            list_teams = []
            if 'tid' in post_data and post_data['tid'] != tid:
                team = Team.get(pid=pid, tid=post_data['tid'])
                if team is None:
                    raise Exception('Not found')

            else:
                for lteam in Team.list_by_pid(pid=pid):
                    list_teams.append(
                        {'_id': lteam.id, 'name': lteam.name})

            uids = []
            if team.chiefs:
                uids.extend(team.chiefs)
            if team.members:
                uids.extend(team.members)

            uids = list(set(uids))
            users_info = User.get_info(uids=uids)

            result_members = []
            for uid in uids:
                if uid in users_info:
                    user = UserInfoBase.parse_obj(
                        {'_id': uid,
                         'profile': {'badge_name': users_info[uid]['profile']['badge_name']},
                         'oauth': {'picture': users_info[uid]['oauth']['picture']}})

                    if team.chiefs and uid in team.chiefs:
                        user.is_chief = True

                    mid = MattermostTools.find_possible_mid(uid=uid)
                    if mid:
                        user.chat = {
                            'mid': mid, 'name': MattermostTools.find_user_name(mid=mid)}

                    result_members.append(user.dict(by_alias=True))

            call_func_bg: Callable[[dict[str, Any], ],
                                   Any] = lambda u: u['profile']['badge_name'].lower()
            result_members.sort(key=call_func_bg)

            call_func_chief: Callable[[
                dict[str, Any], ], Any] = lambda u: u['is_chief']
            result_members.sort(key=call_func_chief, reverse=True)

            tags = []
            if team.tag_members:
                tags = [t_m.dict() for t_m in team.tag_members]

            members_tags = Team.get_members_tags(pid=team.pid, tid=team.id)

            return jsonify({'members': result_members, 'teams': list_teams,
                            'tags': tags, 'members_tags': members_tags})

    return make_response({}, 404)

team_edit

team_edit(pid: str, tid: str) -> str | ResponseBase

Team edit

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/edit', methods=('GET', 'POST'))
def team_edit(pid: str, tid: str) -> str | ResponseBase:
    ''' Team edit '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        return render_template('./team_edit_setting.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True))

    if request.method == 'POST':
        data = {
            'name': request.form['name'].strip(),
            'public_desc': request.form['public_desc'].strip(),
            'desc': request.form['desc'].strip(),
        }
        Team.update_setting(pid=team.pid, tid=team.id, data=data)
        return redirect(url_for('team.team_edit',
                                pid=team.pid, tid=team.id, _scheme='https', _external=True))

    return Response('', status=404)

team_edit_user

team_edit_user(pid: str, tid: str) -> str | ResponseBase

Team edit user

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/edit_user', methods=('GET', 'POST'))
def team_edit_user(pid: str, tid: str) -> str | ResponseBase:
    ''' Team edit user '''
    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        waitting_list: list[dict[str, Any]] = []
        waiting_data = WaitList.list_by_team(pid=pid, tid=tid)

        if waiting_data is not None:
            for waiting in waiting_data:
                waitting_list.append(waiting)

        uids = [u['uid'] for u in waitting_list]
        users_info = User.get_info(uids)

        waiting_uids: list[str] = []
        for user in waitting_list:
            user['_info'] = users_info[user['uid']]
            user['_history'] = []
            for wait_info in WaitList.find_history(pid=pid, uid=user['uid']):
                if 'result' not in wait_info:
                    wait_info['result'] = 'waitting'

                user['_history'].append(wait_info)

            user_data = User(uid=user['uid']).get()
            if user_data:
                user['_mail'] = user_data['mail']

            waiting_uids.append(user['uid'])

        apply_review_results: dict[str, Any] = {}
        waiting_uids_tags_name: dict[str, list[str]] = {}
        if waiting_uids:
            apply_review_results = ApplyReview.get(
                pid=pid, tid=tid, uids=waiting_uids)

            for item in apply_review_results.values():
                item['messages'] = [choice(item['messages']), ]
                for raw in item['messages']:
                    raw['content'] = markdown(
                        raw['content'].replace('\n\n', '\n'))

            waiting_uids_tags_data = Team.get_tags_by_uids(
                pid=pid, tid=tid, uids=waiting_uids)
            mapping_name: dict[str, str] = {
                tag.id: tag.name for tag in team.tag_members} if team.tag_members else {}
            for uid, tags_data in waiting_uids_tags_data.items():
                if uid not in waiting_uids_tags_name:
                    waiting_uids_tags_name[uid] = []

                for tag in tags_data:
                    waiting_uids_tags_name[uid].append(mapping_name[tag])

        return render_template('./team_edit_user.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(
                                   by_alias=True, exclude_none=True),
                               waitting_list=waitting_list,
                               waiting_uids_tags_name=waiting_uids_tags_name,
                               apply_review_results=apply_review_results,
                               )

    if request.method == 'POST':  # pylint: disable=too-many-return-statements, too-many-nested-blocks
        data = request.json

        if data and data['case'] == 'deluser':
            Team.update_members(pid=pid, tid=tid, del_uids=[data['uid'], ])
        elif data and data['case'] == 'history':
            history = []
            for raw in WaitList.find_history_in_team(uid=data['uid'], pid=pid, tid=tid):
                raw['_id'] = str(raw['_id'])
                history.append(raw)

            return jsonify({'history': history})
        elif data and data['case'] == 'members':
            result_members = []
            if team.members or team.chiefs:
                _all_uids = set(teamusers.members + teamusers.chiefs)

                users_info = User.get_info(list(_all_uids))
                for uid in _all_uids:
                    result_members.append(users_info[uid])

                for user in result_members:
                    user['chat'] = {}
                    mid = MattermostTools.find_possible_mid(uid=user['_id'])
                    if mid:
                        user['chat'] = {
                            'mid': mid, 'name': MattermostTools.find_user_name(mid=mid)}

                    user['phone'] = {'country_code': '', 'phone': ''}
                    if 'phone' in user['profile_real'] and user['profile_real']['phone']:
                        phone = phonenumbers.parse(
                            user['profile_real']['phone'])
                        if phone.country_code:
                            user['phone']['country_code'] = \
                                phonenumbers.COUNTRY_CODE_TO_REGION_CODE[
                                    phone.country_code][0]
                        user['phone']['phone'] = phonenumbers.format_number(
                            phone, phonenumbers.PhoneNumberFormat.NATIONAL)

                call_func_bg: Callable[[dict[str, Any], ],
                                       Any] = lambda u: u['profile']['badge_name'].lower()
                result_members = sorted(result_members, key=call_func_bg)

                return jsonify({
                    'members': result_members,
                    'tags': [t_m.dict() for t_m in team.tag_members] if team.tag_members else [],
                    'members_tags': Team.get_members_tags(pid=pid, tid=tid),
                })

        elif data and data['case'] == 'add_tag':
            result = Team.add_tag_member(
                pid=pid, tid=tid, tag_name=data['tag_name'])
            return jsonify({'tag': result})

        elif data and data['case'] == 'update_member_tags':
            team_tags = []
            if team.tag_members:
                for tag in team.tag_members:
                    team_tags.append(tag.id)

            team_members = set(teamusers.members + teamusers.chiefs)

            tag_datas = {}
            for uid in team_members:
                if uid in data['data']:
                    tag_datas[uid] = {'tags': list(
                        set(team_tags) & set(data['data'][uid]))}

            if tag_datas:
                Team.add_tags_to_members(pid=pid, tid=tid, data=tag_datas)

            return jsonify({'data': tag_datas})

        elif data and data['case'] == 'del_tag':
            Team.del_tag(pid=pid, tid=tid, tag_id=data['tag']['id'])

    return jsonify({})

team_edit_user_api

team_edit_user_api(pid: str, tid: str) -> ResponseBase

Team edit user API

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/edit_user/api', methods=('GET', 'POST'))
def team_edit_user_api(pid: str, tid: str) -> ResponseBase:  # pylint: disable=too-many-branches
    ''' Team edit user API '''
    # pylint: disable=too-many-return-statements, too-many-locals
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    if request.method == 'GET':
        if request.args['casename'] == 'join':
            user = User(uid=request.args['uid']).get()
            if user is not None:
                user_waitting_data = WaitList.list_by_team(
                    pid=pid, tid=tid, uid=user['_id'])
                if not user_waitting_data:
                    return jsonify({})

                for user_waitting in user_waitting_data:
                    users_info = User.get_info([user['_id'], ])
                    if users_info:
                        user_data = {
                            'badge_name': users_info[user['_id']]['profile']['badge_name'],
                            'picture': users_info[user['_id']]['oauth']['picture'],
                            'uid': user['_id'],
                            'note': user_waitting['note'],
                            'wid': f"{user_waitting['_id']}",
                        }

                        return jsonify(user_data)

        return jsonify({})

    if request.method == 'POST':
        data = request.json
        if not data:
            return make_response({}, 404)

        if data['casename'] == 'join':
            if data['result'] == 'approval':
                all_members = len(set(teamusers.members + teamusers.chiefs))
                if team.headcount is not None and \
                        team.headcount > 0 and all_members >= team.headcount:

                    return Response(
                        response=json.dumps({
                            'status': 'fail',
                            'message': 'over headcount.'}),
                        status=406,
                        mimetype='application/json',
                    )

            wait_info = WaitList.make_result(
                wid=data['wid'], pid=pid, uid=data['uid'], result=data['result'])
            if wait_info and 'result' in wait_info:
                if wait_info['result'] == 'approval':
                    Team.update_members(
                        pid=pid, tid=tid, add_uids=[data['uid'], ])
                elif wait_info['result'] == 'deny':
                    TeamMemberChangedDB().make_record(
                        pid=pid, tid=tid, action={'deny': [data['uid'], ]})

            return jsonify({'status': 'ok'})

        if data['casename'] == 'get_tags':
            return jsonify({
                'user_tags': Team.get_tags_by_uids(pid=pid, tid=tid, uids=[data['uid'], ]),
                'tags': [raw.dict() for raw in team.tag_members] if team.tag_members else [],
            })

        if data['casename'] == 'presave_tags':
            team_tags = []
            if team.tag_members:
                for tag in team.tag_members:
                    team_tags.append(tag.id)

            tag_datas: dict[str, dict[str, list[str]]] = {}
            tag_datas[data['uid']] = {'tags': list(
                set(team_tags) & set(data['tags'] or []))}
            if tag_datas:
                Team.add_tags_to_members(pid=pid, tid=tid, data=tag_datas)
            return jsonify({
                'user_tags': data['tags'],
            })

    return make_response({}, 404)

team_edit_user_dl_waiting

team_edit_user_dl_waiting(
    pid: str, tid: str
) -> str | ResponseBase

Team edit user

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/edit_user/dl_waiting', methods=('GET',))
def team_edit_user_dl_waiting(pid: str, tid: str) -> str | ResponseBase:
    ''' Team edit user '''
    # pylint: disable=too-many-locals
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    is_admin = (g.user['account']['_id'] in teamusers.chiefs or
                g.user['account']['_id'] in teamusers.owners or
                g.user['account']['_id'] in project.owners)

    if not is_admin:
        return redirect('/')

    waitting_list: list[dict[str, Any]] = []
    waiting_data = WaitList.list_by_team(pid=pid, tid=tid)

    if waiting_data is not None:
        for waiting in waiting_data:
            waitting_list.append(waiting)

    users_info = User.get_info([u['uid'] for u in waitting_list])

    result: dict[str, dict[str, str]] = {}
    for waiting in waitting_list:
        uid = waiting['uid']
        result[uid] = {}
        result[uid]['name'] = users_info[uid]['profile']['badge_name']
        email = users_info[uid]['oauth']['email']
        result[uid]['email_for_to'] = f'"{result[uid]["name"]}" <{email}>'
        result[uid]['submission'] = waiting['note']
        result[uid]['intro'] = users_info[uid]['profile']['intro'].replace(
            '\n', '\r\n')
        result[uid]['url'] = f'https://{setting.DOMAIN}/user/{uid}'
        result[uid]['date'] = arrow.get(ObjectId(waiting['_id']).generation_time).to(
            'Asia/Taipei').format('YYYY-MM-DD')

    with io.StringIO() as files:
        csv_writer = csv.DictWriter(files,
                                    fieldnames=['date', 'name', 'email_for_to', 'submission',
                                                'url', 'intro'],
                                    quoting=csv.QUOTE_MINIMAL)
        csv_writer.writeheader()
        csv_writer.writerows(result.values())

        filename = f"coscup_waiting_{pid}_{tid}_" + \
                   f"{arrow.now().to('Asia/Taipei').format('YYYYMMDD-HHmmss')}.csv"

        return Response(
            files.getvalue().encode(encoding="utf-8-sig"),
            mimetype='text/csv',
            headers={'Content-disposition': f'attachment; filename={filename}',
                     'x-filename': filename,
                     })

team_expense_index

team_expense_index(pid: str, tid: str) -> ResponseBase

Team expense index

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/expense/', methods=('GET', 'POST'))
def team_expense_index(pid: str, tid: str) -> ResponseBase:
    ''' Team expense index '''
    # pylint: disable=too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'POST':
        data = request.get_json()

        if data and data['casename'] == 'get':
            teams = []
            for _team in Team.list_by_pid(pid=project.id):
                teams.append({'name': _team.name, 'tid': _team.id})

            if html.escape(data['select_team']) in [_team['tid'] for _team in teams]:
                select_team = html.escape(data['select_team'])
            else:
                select_team = team.id

            items = []
            for item in Budget.get_by_tid(pid=pid, tid=select_team, only_enable=True):
                items.append(item)

            bank = User.get_bank(uid=g.user['account']['_id'])

            return jsonify({'teams': teams, 'items': items,
                            'select_team': select_team, 'bank': bank.dict()})

        if data and data['casename'] == 'add_expense':
            # create expense and send notification.
            expense = Expense.process_and_add(
                pid=project.id, tid=team.id, uid=g.user['account']['_id'], data=data)
            expense_create.apply_async(kwargs={'expense': expense})
            return jsonify({})

        if data and data['casename'] == 'get_has_sent':
            data = Expense.get_has_sent(
                pid=project.id, budget_id=data['buid'])
            return jsonify({'data': list(data)})

    return make_response({}, 404)

team_expense_lists

team_expense_lists(
    pid: str, tid: str
) -> str | ResponseBase

Team expense lists

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/expense/lists', methods=('GET', 'POST'))
def team_expense_lists(pid: str, tid: str) -> str | ResponseBase:
    ''' Team expense lists '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        budget_admin = Budget.is_admin(pid=pid, uid=g.user['account']['_id'])
        return render_template('./expense_lists.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True),
                               budget_menu=budget_admin)

    return Response('', status=404)

team_expense_my

team_expense_my(pid: str, tid: str) -> str | ResponseBase

Team expense my

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/expense/my', methods=('GET', 'POST'))
def team_expense_my(pid: str, tid: str) -> str | ResponseBase:
    ''' Team expense my '''
    # pylint: disable=too-many-locals,too-many-branches,too-many-return-statements,too-many-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        budget_admin = Budget.is_admin(pid=pid, uid=g.user['account']['_id'])
        return render_template('./expense_my.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True),
                               budget_menu=budget_admin)

    if request.method == 'POST':
        data = request.get_json()

        if data and data['casename'] == 'get':
            teams = []
            for _team in Team.list_by_pid(pid=project.id):
                teams.append({'name': _team.name, 'tid': _team.id})

            buids = set()
            uids = set()
            items = {}
            dispense_ids = set()
            dispenses = []

            for item in Expense.get_by_create_by(pid=pid, create_by=g.user['account']['_id']):
                items[item['_id']] = item
                dispense_ids.add(item['dispense_id'])

            dispense_ids_list = list(dispense_ids)

            for item in Expense.get_by_dispense_id(dispense_ids_list):
                items[item['_id']] = item

            for item in items.values():
                buids.add(item['request']['buid'])
                uids.add(item['create_by'])

            for dispense in Dispense.get_by_ids(dispense_ids_list):
                dispenses.append(dispense)

            budgets = {}
            if buids:
                for raw in Budget.get(buids=list(buids), pid=pid):
                    budgets[raw['_id']] = raw

            users = {}
            if uids:
                user_datas = User.get_info(uids=list(uids))
                for uid, value in user_datas.items():
                    users[uid] = {
                        'oauth': value['oauth'],
                        'profile': {'badge_name': value['profile']['badge_name']}, }

            return jsonify({
                'teams': teams,
                'items': list(items.values()),
                'dispenses': dispenses,
                'budgets': budgets,
                'users': users,
                'status': Expense.status(),
                'my': g.user['account']['_id']
            })

        if data and data['casename'] == 'update':
            invoices = {}
            status = ''
            for expense in Expense.get_by_eid(expense_id=data['eid']):
                status = expense['status']
                for invoice in expense['invoices']:
                    invoices[invoice['iv_id']] = invoice

            for invoice in data['invoices']:
                if invoice['iv_id'] in invoices:
                    if status in ('1', ):
                        invoices[invoice['iv_id']
                                 ]['total'] = invoice['total']

                    if status in ('1', '2', '3'):
                        invoices[invoice['iv_id']
                                 ]['status'] = invoice['status'].strip()
                        invoices[invoice['iv_id']
                                 ]['name'] = invoice['name'].strip()

            Expense.update_invoices(
                expense_id=data['eid'], invoices=list(invoices.values()))

            if status in ('1', ):
                Expense.update_bank(expense_id=data['eid'], bank=data['bank'])

            if status in ('1', '2', '3'):
                Expense.update_request(
                    expense_id=data['eid'], rdata=data['req'])

            return jsonify({})

        if data and data['casename'] == 'remove':
            status = ''
            for expense in Expense.get_by_eid(expense_id=data['eid']):
                status = expense['status']

            if status in ('1', ):
                Expense.update_enable(expense_id=data['eid'], enable=False)

            return jsonify({})

    return make_response({}, 404)

team_form_accommodation

team_form_accommodation(
    pid: str, tid: str
) -> str | ResponseBase

Team form accommodation

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/accommodation', methods=('GET', 'POST'))
def team_form_accommodation(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form accommodation '''
    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    is_ok_submit = False
    user = g.user['account']
    if 'profile_real' in user and 'name' in user['profile_real'] and \
            'roc_id' in user['profile_real'] and 'phone' in user['profile_real']:
        if user['profile_real']['name'] and user['profile_real']['roc_id'] and \
                user['profile_real']['phone']:
            is_ok_submit = True

    if request.method == 'GET':
        return render_template('./form_accommodation.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True),
                               is_ok_submit=is_ok_submit)

    if request.method == 'POST':
        if not is_ok_submit:
            return Response('', status=406)

        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            raw = {'selected': 'no', 'mixed': True}
            room = {}

            form_data = Form.get_accommodation(
                pid=pid, uid=g.user['account']['_id'])
            if form_data:
                raw['selected'] = form_data['data']['key']
                raw['mixed'] = form_data['data']['mixed']

                if 'room' in form_data['data'] and form_data['data']['room']:
                    room['no'] = form_data['data']['room']
                    room['key'] = form_data['data']['room_key']
                    room['exkey'] = form_data['data'].get('room_exkey', '')

                    room['mate'] = {}
                    _user_room, mate_room = FormAccommodation.get_room_mate(
                        pid=pid, uid=g.user['account']['_id'])
                    if mate_room:
                        user_info = User.get_info(uids=[mate_room['uid'], ])[
                            mate_room['uid']]
                        room['mate'] = {
                            'uid': mate_room['uid'],
                            'name': user_info['profile']['badge_name'],
                            'tid': '',
                            'picture': user_info['oauth']['picture'],
                        }

            return jsonify({'data': raw, 'room': room})

        if post_data and post_data['casename'] == 'update':
            if not project.formswitch.accommodation:
                return make_response({}, 401)

            selected = html.escape(post_data['selected'])

            if selected not in ('no', 'yes', 'yes-longtraffic'):
                return Response('', status=406)

            data = {
                'status': selected in ('yes', 'yes-longtraffic'),
                'key': selected,
                'mixed': post_data['mixed']
            }

            Form.update_accommodation(
                pid=pid, uid=g.user['account']['_id'], data=data)

            return jsonify({'data': {'selected': selected}})

        if post_data and post_data['casename'] == 'makechange':
            return jsonify({'msg': '今年不適用!'})

    return jsonify({})

team_form_api

team_form_api(pid: str, tid: str) -> ResponseBase

Team form API

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/api', methods=('GET', 'POST'))
def team_form_api(pid: str, tid: str) -> ResponseBase:
    ''' Team form API '''
    team, _, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        if request.args['case'] == 'traffic_fee':
            data = FormTrafficFeeMapping.get(pid=pid)
            if data is not None:
                return jsonify({'locations': [(item.location, item.fee) for item in data]})

    return make_response({}, 404)

team_form_appreciation

team_form_appreciation(
    pid: str, tid: str
) -> str | ResponseBase

Team form appreciation

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/appreciation', methods=('GET', 'POST'))
def team_form_appreciation(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form appreciation '''
    # pylint: disable=too-many-branches,too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        names = {
            'oauth': g.user['data']['name'],
        }

        if 'profile' in g.user['account'] and \
           'badge_name' in g.user['account']['profile'] and \
           g.user['account']['profile']['badge_name']:
            names['badge_name'] = g.user['account']['profile']['badge_name']

        if 'profile_real' in g.user['account'] and \
           'name' in g.user['account']['profile_real'] and \
           g.user['account']['profile_real']['name']:
            names['real_name'] = g.user['account']['profile_real']['name']

        select_value = 'no'
        form_data = Form.get_appreciation(
            pid=pid, uid=g.user['account']['_id'])
        if form_data and 'data' in form_data and 'key' in form_data['data']:
            if 'available' in form_data['data'] and form_data['data']['available']:
                select_value = form_data['data']['key']

        return render_template('./form_appreciation.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True), names=names.items(),
                               select_value=select_value)

    if request.method == 'POST':
        if not project.formswitch.appreciation:
            return Response('', status=401)

        if request.form['appreciation'] not in ('oauth', 'badge_name', 'real_name', 'no'):
            return Response('', status=406)

        if request.form['appreciation'] == 'no':
            app_data = AppreciationData()

        else:
            if request.form['appreciation'] == 'oauth':
                name = g.user['data']['name']
            elif request.form['appreciation'] == 'badge_name':
                name = g.user['account']['profile']['badge_name']
            elif request.form['appreciation'] == 'real_name':
                name = g.user['account']['profile_real']['name']
            else:
                raise NameError("Can not find the `name`.")

            app_data = AppreciationData.parse_obj({
                'available': True,
                'key': request.form['appreciation'],
                'value': name,
            })

        Form().update_appreciation(
            pid=team.pid, uid=g.user['account']['_id'], data=app_data.dict())

        return redirect(url_for('team.team_form_appreciation',
                                pid=team.pid, tid=team.id, _scheme='https', _external=True))

    return make_response({}, 404)

team_form_clothes

team_form_clothes(pid: str, tid: str) -> str | ResponseBase

Team form clothes

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/clothes', methods=('GET', 'POST'))
def team_form_clothes(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form clothes '''
    # pylint: disable=too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        return render_template('./form_clothes.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            data = Form.get_clothes(
                pid=team.pid, uid=g.user['account']['_id'])

            htg = ''
            if not data:
                data = {'data': {'clothes': '',
                                 'htg': htg, }}

            if 'htg' in data['data']:
                htg = data['data']['htg']

            return jsonify({
                'clothes': data['data']['clothes'],
                'htg': htg,
            })

        if post_data and post_data['casename'] == 'post':
            if not project.formswitch.clothes:
                return make_response({}, 401)

            if 'clothes' in post_data and post_data['clothes']:
                Form.update_clothes(pid=team.pid, uid=g.user['account']['_id'],
                                    data={'clothes': post_data['clothes'],
                                          'htg': post_data['htg'],
                                          })
                return jsonify({})

    return jsonify({})

team_form_drink

team_form_drink(pid: str, tid: str) -> str | ResponseBase

Team form drink

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/drink', methods=('GET', 'POST'))
def team_form_drink(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form drink '''
    # pylint: disable=too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        return render_template('./form_drink.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()
        if post_data and post_data['casename'] == 'get':
            data = Form.get_drink(
                pid=team.pid, uid=g.user['account']['_id'])
            if not data:
                data = {'data': {'y18': False}}

            return jsonify({'data': data['data']})

        if post_data and post_data['casename'] == 'post':
            if 'y18' in post_data:
                data = {'y18': bool(post_data['y18'])}
                Form.update_drink(
                    pid=team.pid, uid=g.user['account']['_id'], data=data)

        return jsonify({})

    return make_response({}, 404)

team_form_parking_card

team_form_parking_card(
    pid: str, tid: str
) -> str | ResponseBase

Team form parking card

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/parking_card', methods=('GET', 'POST'))
def team_form_parking_card(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form parking card '''
    # pylint: disable=too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    if request.method == 'GET':
        return render_template('./form_parking_card.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True))

    if request.method == 'POST':
        post_data = request.get_json()

        if post_data and post_data['casename'] == 'get':
            data = Form.get_parking_card(
                pid=team.pid, uid=g.user['account']['_id'])

            if not data:
                return jsonify({'data': {'carno': '', 'dates': []},
                                'parking_card_options': []})

            return jsonify({'data': data['data'],
                            'parking_card_options': []})

        if post_data and post_data['casename'] == 'post':
            if 'data' in post_data and post_data['data']:
                carno = post_data['data']['carno'].strip().upper()
                if not carno:
                    return jsonify({})

                dates = post_data['data']['dates']

                Form.update_parking_card(pid=team.pid, uid=g.user['account']['_id'],
                                         data={'carno': carno, 'dates': dates})

                return jsonify({})

    return make_response({}, 404)

team_form_traffic_fee

team_form_traffic_fee(
    pid: str, tid: str
) -> str | ResponseBase

Team form traffic fee

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/traffic_fee', methods=('GET', 'POST'))
def team_form_traffic_fee(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form traffic fee '''
    # pylint: disable=too-many-branches,too-many-return-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    is_ok_submit = False
    user = g.user['account']
    feemapping = FormTrafficFeeMapping.get(pid=pid)

    if project.traffic_fee_doc == 'https://on.org/' and feemapping:
        if 'profile_real' in user and 'bank' in user['profile_real']:
            _short_check = []
            for k in ('name', 'branch', 'no', 'code'):
                if k in user['profile_real']['bank'] and user['profile_real']['bank'][k]:
                    _short_check.append(True)
                else:
                    _short_check.append(False)

            if all(_short_check):
                is_ok_submit = True

    if request.method == 'GET':
        form_data = Form.get_traffic_fee(pid=pid, uid=g.user['account']['_id'])
        data = ''
        if form_data:
            data = json.dumps({
                'apply': 'yes' if form_data['data']['apply'] else 'no',
                'fromwhere': form_data['data']['fromwhere'],
                'howto': form_data['data']['howto'],
                'fee': form_data['data']['fee'],
            })
        return render_template('./form_traffic_fee.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True),
                               data=data, is_ok_submit=is_ok_submit)

    if request.method == 'POST':
        if not project.formswitch.traffic:
            return Response('', status=401)

        if is_ok_submit and feemapping and \
                request.form['fromwhere'] in [item.location for item in feemapping]:
            fee_data: FeeMapping = FeeMapping.parse_obj({
                'fee': int(request.form['fee']),
                'howto': request.form['howto'].strip(),
                'apply': request.form['apply'].strip() == 'yes',
                'fromwhere': request.form['fromwhere'],
            })
            Form.update_traffic_fee(
                pid=pid, uid=g.user['account']['_id'], data=fee_data.dict())
            return redirect(url_for('team.team_form_traffic_fee',
                                    pid=team.pid, tid=team.id,
                                    _scheme='https', _external=True))

        return Response('', status=406)

    return Response('', status=404)

team_form_volunteer_certificate

team_form_volunteer_certificate(
    pid: str, tid: str
) -> str | ResponseBase

Team form volunteer certificate

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/form/volunteer_certificate', methods=('GET', 'POST'))
def team_form_volunteer_certificate(pid: str, tid: str) -> str | ResponseBase:
    ''' Team form volunteer certificate '''
    # pylint: disable=too-many-return-statements,too-many-branches
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    is_ok_submit = False
    user = g.user['account']
    if 'profile_real' in user:
        _check = []
        for k in ('name', 'roc_id', 'birthday', 'company'):
            if k in user['profile_real'] and user['profile_real'][k]:
                _check.append(True)
            else:
                _check.append(False)

        is_ok_submit = all(_check)

    if request.method == 'GET':
        form_data = Form.get_volunteer_certificate(
            pid=pid, uid=g.user['account']['_id'])
        if form_data and 'data' in form_data and 'value' in form_data['data']:
            select_value = 'yes' if form_data['data']['value'] else 'no'
        else:
            select_value = 'no'

        return render_template('./form_volunteer_certificate.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True),
                               is_ok_submit=is_ok_submit, select_value=select_value)

    if request.method == 'POST':
        if not project.formswitch.certificate:
            return Response('', status=401)

        if not is_ok_submit:
            return Response('', status=406)

        data = {'value': request.form['volunteer_certificate'] == 'yes'}
        Form.update_volunteer_certificate(
            pid=team.pid, uid=g.user['account']['_id'], data=data)

        return redirect(url_for('team.team_form_volunteer_certificate',
                                pid=team.pid, tid=team.id, _scheme='https', _external=True))

    return make_response({}, 404)

team_join_to

team_join_to(pid: str, tid: str) -> str | ResponseBase

Team join to

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/join_to', methods=('GET', 'POST'))
def team_join_to(pid: str, tid: str) -> str | ResponseBase:  # pylint: disable=too-many-return-statements,too-many-branches
    ''' Team join to '''
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if g.user['account']['_id'] in teamusers.members or \
            g.user['account']['_id'] in teamusers.chiefs:
        return redirect(url_for('team.index', pid=pid, tid=tid))

    user_pass = AccountPass(uid=g.user['account']['_id'])

    if user_pass.is_edu_account:
        flash('請勿使用學校帳號註冊!')

    if not user_pass.is_profile:
        flash('''請完成「<a href="/setting/profile">我的簡介</a>」,
        編寫內容請包含:<strong>自我介紹</strong>、<strong>技能</strong>、<strong>年度期待</strong>
        (參考:<a href="/user/e161787f">範例一</a>、
        <a href="/user/2b17b7b8">範例二</a>、<a href="/user/6c74e623">範例三</a>)。
        可使用 Markdown 的語法排版(<a href="https://markdown.tw/">語法參考</a>)。''')

    if not user_pass.is_coc:
        flash('請先閱讀「<a href="/coc">社群守則</a>」。')

    if not user_pass.is_security_guard:
        flash('請先閱讀「<a href="/security_guard">資料保護原則 </a>」。')

    if not user_pass.has_chat:
        flash(
            '請先建立 Mattermost 帳號,前往「<a href="/setting/link/chat">連結 chat.coscup.org 帳號</a>」。')

    if request.method == 'GET':
        is_in_wait = WaitList.is_in_wait(
            pid=team.pid, tid=team.id, uid=g.user['account']['_id'])

        if not is_in_wait and team.public_desc is not None:
            team.public_desc = re.sub('<a href="javascript:.*"', '<a href="/"',
                                      markdown(html.escape(team.public_desc)))

        return render_template('./team_join_to.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True), is_in_wait=is_in_wait)

    if request.method == 'POST':
        if not all((user_pass.is_profile, user_pass.is_coc, user_pass.is_security_guard)):
            return redirect(f'/team/{team.pid}/{team.id}/join_to')

        note: str = request.form['note'].strip()

        if len(note) < 100:
            flash('請重新整理此頁面後再次填寫申請加入。')
            return redirect(f'/team/{team.pid}/{team.id}/join_to')

        WaitList.join_to(
            pid=pid, tid=tid, uid=g.user['account']['_id'], note=note)
        TeamMemberChangedDB().make_record(
            pid=pid, tid=tid, action={'waiting': [g.user['account']['_id'], ]})
        applyreview_submit_one.apply_async(kwargs={
            'pid': pid, 'tid': tid, 'uid': g.user['account']['_id'],
        })

        return redirect(f'/team/{team.pid}/{team.id}/join_to')

    return Response('', status=404)

team_plan_edit

team_plan_edit(pid: str, tid: str) -> str | ResponseBase

Team plan edit

Source code in view/team.py
@VIEW_TEAM.route('/<pid>/<tid>/plan/edit', methods=('GET', 'POST'))
def team_plan_edit(pid: str, tid: str) -> str | ResponseBase:
    ''' Team plan edit '''
    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
    team, project, _redirect = check_the_team_and_project_are_existed(
        pid=pid, tid=tid)
    if _redirect:
        return _redirect

    if not team or not project:
        return redirect('/')

    teamusers = TeamUsers.parse_obj(team)
    if not (g.user['account']['_id'] in teamusers.members or
            g.user['account']['_id'] in teamusers.chiefs):
        return redirect('/')

    is_admin: bool = (g.user['account']['_id'] in teamusers.chiefs or
                      g.user['account']['_id'] in teamusers.owners or
                      g.user['account']['_id'] in project.owners)

    if request.method == 'GET':
        return render_template('./team_plan_edit.html',
                               project=project.dict(by_alias=True),
                               team=team.dict(by_alias=True), is_admin=is_admin)

    if request.method == 'POST':  # pylint: disable=too-many-nested-blocks
        data = request.get_json()
        if not data:
            return make_response({}, 404)

        today = arrow.now().format('YYYY-MM-DD')
        default = {'title': '', 'desc': '', 'start': today, 'end': '',
                   'tid': team.id, 'team_name': team.name, 'start_timestamp': 0}

        team_plan_db = TeamPlanDB()
        if 'case' in data and data['case'] == 'get':
            plan_data = team_plan_db.find_one({'pid': pid, 'tid': tid})
            if not plan_data:
                plan_data = {'data': [default, ]}

            if not plan_data['data']:
                plan_data['data'] = [default, ]

            for raw in plan_data['data']:
                raw['tid'] = tid
                raw['team_name'] = team.name

            others = []
            if 'import_others' in data and data['import_others']:
                for team_plan in team_plan_db.find({'pid': pid, 'tid': {'$nin': [tid, ]}}):
                    team_info = Team.get(pid=pid, tid=team_plan['tid'])

                    if not team_info:
                        continue

                    for raw in team_plan['data']:
                        raw['tid'] = tid
                        raw['team_name'] = team_info.name

                        others.append(raw)

            return jsonify({'data': plan_data['data'], 'default': default, 'others': others})

        if 'case' in data and data['case'] == 'get_schedular':
            query = {'pid': pid}
            if not data['import_others']:
                query['tid'] = tid

            dates: dict[str, list[dict[str, str]]] = {}
            for raw in team_plan_db.find(query):
                for plan in raw['data']:
                    if not plan['end']:
                        if plan['start'] not in dates:
                            dates[plan['start']] = []

                        dates[plan['start']].append(plan)
                    else:
                        for date in arrow.Arrow.range('day',
                                                      arrow.get(plan['start']),
                                                      arrow.get(plan['end'])):
                            d_format = date.format('YYYY-MM-DD')
                            if d_format not in dates:
                                dates[d_format] = []
                            dates[d_format].append(plan)

            return jsonify({'data': list(dates.items())})

        if 'case' in data and data['case'] == 'post':
            if 'data' in data:
                _data = []
                for raw in data['data']:
                    if raw['title'] and raw['start']:
                        try:
                            arrow.get(raw['start'])
                            _raw = {}
                            for k in ('title', 'start', 'end', 'desc'):
                                _raw[k] = raw[k]
                            _data.append(_raw)
                        except arrow.parser.ParserError:
                            continue

                _data = sorted(_data, key=lambda d: arrow.get(d['start']))
                result = team_plan_db.add(pid=pid, tid=tid, data=_data)

                for raw in result['data']:
                    raw['tid'] = tid
                    raw['team_name'] = team.name
                    raw['start_timestamp'] = arrow.get(
                        raw['start']).timestamp()

                if not result['data']:
                    result['data'] = [default, ]

                others = []
                if 'import_others' in data and data['import_others']:
                    for team_plan in team_plan_db.find({'pid': pid, 'tid': {'$nin': [tid, ]}}):
                        team_info = Team.get(pid=pid, tid=team_plan['tid'])

                        if not team_info:
                            continue

                        for raw in team_plan['data']:
                            raw['tid'] = tid
                            raw['team_name'] = team_info.name
                            raw['start_timestamp'] = arrow.get(
                                raw['start']).timestamp()

                            others.append(raw)

                return jsonify({'data': result['data'], 'default': default, 'others': others})

        return jsonify({'data': [], 'default': default})

    return make_response({}, 404)

view.telegram

Telegram

link_telegram_verify(tg_uuid: str) -> ResponseBase

Link Telegram verify

Source code in view/telegram.py
@VIEW_TELEGRAM.route('/verify/<tg_uuid>', methods=('GET', 'POST'))
def link_telegram_verify(tg_uuid: str) -> ResponseBase:
    ''' Link Telegram verify '''
    if request.method == 'GET':
        mem_cache = MC.get_client()
        data = mem_cache.get(f'tg:{tg_uuid}')
        if not data:
            return Response('Expired. `/linkme` again', status=406)

        user_data = mem_cache.get(f"tgu:{data['chat_id']}")
        if data and user_data:
            save_data = {'uid': g.user['account']
                         ['_id'], 'added': arrow.now().naive}
            save_data.update(user_data)
            TelegramDB().add(save_data)

            TelegramBot(token=setting.TELEGRAM_TOKEN).send_message(
                chat_id=save_data['id'],
                text='與 [%(uid)s](https://volunteer.coscup.org/user/%(uid)s) 完成帳號綁定!(Completed)' % save_data)  # pylint: disable=line-too-long

            mem_cache.delete_multi(
                [f'tg:{tg_uuid}', f"tgu:{g.user['account']['_id']}"])

            logging.info('[Telegram][Send] linkme: %s %s',
                         save_data['id'], save_data['uid'])

            return redirect(url_for('setting.link_telegram', _scheme='https', _external=True))

        return Response('Expired. `/linkme` again', status=406)

    return Response('', status=404)

receive

receive() -> ResponseBase

receive

Source code in view/telegram.py
@VIEW_TELEGRAM.route('/r', methods=('POST', ))
def receive() -> ResponseBase:
    ''' receive '''
    data = request.get_json()
    logging.info('[telegram] %s', data)

    if data and TelegramBot.is_command_start_linkme(data):
        uuid_data = TelegramBot.gen_uuid(chat_id=data['message']['from']['id'])
        TelegramBot.temp_fetch_user_data(data=data)

        resp = TelegramBot(token=setting.TELEGRAM_TOKEN).send_message(
            chat_id=data['message']['from']['id'],
            text='請繼續前往志工平台登入驗證,感謝!',
            reply_markup={
                'inline_keyboard': [
                    [{'text': '驗證(verify)',
                      'url': f"https://{setting.DOMAIN}/telegram/verify/{uuid_data['uuid']}"}, ],
                ]},
        )

        logging.info('[Telegram][Send] %s', resp.json())

    return Response('', status=200)

view.user

User

index

index() -> str

Index

Source code in view/user.py
@VIEW_USER.route('/')
def index() -> str:
    ''' Index '''
    return 'user'

user_page

user_page(
    uid: str, nickname: str | None = None
) -> ResponseBase | str

User page

Source code in view/user.py
@VIEW_USER.route('/<uid>/<nickname>')
@VIEW_USER.route('/<uid>')
def user_page(uid: str, nickname: str | None = None) -> ResponseBase | str:  # pylint: disable=too-many-branches
    ''' User page '''
    user = User(uid=uid).get()

    if not user:
        return Response('', status=200)

    oauth = OAuth(user['mail']).get()

    if not oauth:
        return Response('', status=404)

    if 'data' in oauth and 'picture' in oauth['data']:
        oauth['data']['picture'] = GSuite.size_picture(
            oauth['data']['picture'])

    if 'profile' in user and 'badge_name' in user['profile'] and \
            user['profile']['badge_name']:
        _nickname = user['profile']['badge_name']
    else:
        _nickname = oauth['data']['name']

    _nickname = quote_plus(_nickname)

    if nickname is None or nickname != _nickname:
        return redirect(url_for('user.user_page', uid=uid, nickname=_nickname))

    if 'profile' not in user:
        badge_name = ''
        intro = ''
    else:
        badge_name = user['profile']['badge_name']

        intro = ''
        if 'intro' in user['profile']:
            intro = re.sub('<a href="javascript:.*"', '<a href="/"',
                           markdown(html.escape(user['profile']['intro'])))

    participate_in = []
    for item in Team.participate_in(uid):
        project = Project.get(item['pid'])
        if not project:
            raise Exception('No project')

        item['_project'] = project.dict(by_alias=True)
        item['_title'] = '???'
        if uid in item['chiefs']:
            item['_title'] = 'chief'
        elif uid in item['members']:
            item['_title'] = 'member'

        item['_action_date'] = arrow.get(
            item['_project']['action_date']).format('YYYY/MM')

        participate_in.append(item)

    call_func: Callable[[dict[str, Any], ],
                        Any] = lambda p: p['_project']['action_date']
    participate_in.sort(key=call_func, reverse=True)

    mattermost_data = {}
    mid = MattermostTools.find_possible_mid(uid=uid)
    if mid:
        mattermost_data['mid'] = mid
        mattermost_data['username'] = MattermostTools.find_user_name(mid=mid)

    return render_template('./user.html',
                           badge_name=badge_name,
                           intro=intro,
                           oauth=oauth,
                           user=user,
                           mattermost_data=mattermost_data,
                           participate_in=participate_in,
                           )

view.utils

utils

check_the_team_and_project_are_existed

check_the_team_and_project_are_existed(
    pid: str, tid: str
) -> (
    tuple[TeamBase, ProjectBase, None]
    | tuple[None, None, ResponseBase]
)

Base check the team and profect are existed

:param str pid: project id :param str tid: team id :rtype: tuple :return: team, project, redirect

Source code in view/utils.py
def check_the_team_and_project_are_existed(pid: str, tid: str) -> \
        tuple[TeamBase, ProjectBase, None] | tuple[None, None, ResponseBase]:
    ''' Base check the team and profect are existed

    :param str pid: project id
    :param str tid: team id
    :rtype: tuple
    :return: team, project, redirect

    '''
    team = Team.get(pid, tid)
    if not team:
        return None, None, redirect('/')

    project = Project.get(pid)
    if not project:
        return None, None, redirect('/')

    return team, project, None