윤창목

Changed signed_url keys from name to id

import mimetypes
import json
import os
from datetime import datetime
import boto3
from botocore.client import Config
from django.contrib.auth.models import User
from django.core import serializers
from django.views.decorators.csrf import csrf_exempt
from rest_framework import viewsets
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated, AllowAny
from api.models import Item, SharedItem
from api.serializers import UserSerializer,GroupSerializer,ItemSerializer
from rest_framework import status
from annoying.functions import get_object_or_None
from khudrive.settings import AWS_SESSION_TOKEN, AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID, AWS_REGION, AWS_STORAGE_BUCKET_NAME
class UserViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows users to be viewed or edited.
"""
queryset = User.objects.all().order_by('-date_joined')
serializer_class = UserSerializer
permission_classes = [permissions.IsAuthenticated]
class ItemViewSet(viewsets.ViewSet):
queryset = Item.objects.all()
serializer_class = ItemSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly, permissions.AllowAny,
#IsOwnerOrReadOnly
]
permission_classes_by_action = {'get': [permissions.AllowAny],
'destroy': [permissions.AllowAny]}
# url: items/search
@action(methods=['GET'], detail=False, permission_classes=[AllowAny], url_path='search', url_name='search')
def search(self, request):
if request.method == 'GET':
keyword = request.GET.get('keyword', '')
item_list = Item.objects.filter(name__icontains = keyword)
data = serializers.serialize("json", item_list)
json_data = json.loads(data)
res = []
for i in json_data:
t = i['fields']
t['id'] = i['pk']
res.append(t)
return Response({'data': {'list' : res}}, status=status.HTTP_200_OK)
"""
# url: items/11/
# 마지막 slash도 써주어야함
def get(self, request, pk):
#print(pk)
s3 = boto3.client('s3')
s3_bucket = AWS_STORAGE_BUCKET_NAME
#파일 객체 생성
object_name = request.GET.get('name', '')
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': s3_bucket,
'Key': object_name},
ExpiresIn = 3600
)
return Response({'message': presigned_url}, status=status.HTTP_200_OK)
"""
# url: items/11/
# 마지막 slash도 써주어야함
def get(self, request, pk):
s3 = boto3.client('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
config=Config(signature_version='s3v4'))
s3_bucket = AWS_STORAGE_BUCKET_NAME
item = Item.objects.filter(item_id=pk)
object_name = item.get().name
data = serializers.serialize("json", item)
json_data = json.loads(data)
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': s3_bucket,
'Key': object_name},
ExpiresIn = 3600
)
res = json_data[0]['fields']
res['id']=json_data[0]['pk']
res['signed_url']=presigned_url
return Response({'data': res}, status=status.HTTP_200_OK)
# url: items/11/
# 마지막 slash도 써주어야함
def destroy(self, request, pk):
if request.method == 'DELETE':
print(pk)
item = get_object_or_None(Item, item_id=pk)
if item != None:
if item.is_folder == True: # 폴더는 삭제 안되도록 처리
return Response({'message': 'This item is folder.'}, status=status.HTTP_200_OK)
item.is_deleted = True
item.save()
# item.delete() 이거 하면 완전 삭제되어버림 is deleted True 면 휴지통에서 리스트 조회할 수 있도록!
return Response({'message': 'delete complete'},status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
# url: items/11/move
# 마지막 slash도 써주어야함
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='move', url_name='move')
def move(self, request, pk):
if request.method == 'POST':
parent_id = request.POST.get('parent', '')
name = request.POST.get('name','')
parent = get_object_or_None(Item, item_id=parent_id)
if parent != None and parent.is_folder == True:
child = get_object_or_None(Item, item_id=pk)
if child == None:
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
child.parent = parent_id
child.save()
child = Item.objects.filter(item_id = pk)
child_data = serializers.serialize("json", child)
json_child = json.loads(child_data)
res = json_child[0]['fields']
res['id'] = pk
parent = Item.objects.filter(item_id = parent_id)
parent_data = serializers.serialize("json", parent)
json_parent = json.loads(parent_data)[0]['fields']
res['parentInfo'] = json_parent
return Response({'data': res}, status=status.HTTP_200_OK)
if parent == None:
return Response({'message': 'parent is not existed.'}, status=status.HTTP_200_OK)
if parent.is_folder == False:
return Response({'message': 'parent is not folder.'}, status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='copy', url_name='copy')
def copy(self, request, pk):
if request.method == 'POST':
parent_id = request.POST.get('parent', '')
parent = get_object_or_None(Item, item_id=parent_id)
if parent != None and parent.is_folder == True:
child = get_object_or_None(Item, item_id=pk)
if child == None:
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
if child.is_folder == True:
return Response({'message': 'item is folder'}, status=status.HTTP_204_NO_CONTENT)
copiedName = child.name + "_복사본_" + str(datetime.now().strftime('%Y-%m-%d %H:%M'))
copiedItem = Item(is_folder = False, name = copiedName, path =child.path, parent = parent_id, user_id= child.user_id, size=child.size, status=child.status)
copiedItem.save()
copiedItem = Item.objects.filter(name = copiedName)
copied_data = serializers.serialize("json", copiedItem)
json_data = json.loads(copied_data)
res = json_data[0]['fields']
res['id'] = json_data[0]['pk']
parent = Item.objects.filter(item_id = parent_id)
parent_data = serializers.serialize("json", parent)
json_parent = json.loads(parent_data)[0]['fields']
res['parentInfo'] = json_parent
return Response({'data': res}, status=status.HTTP_200_OK)
if parent == None:
return Response({'message': 'parent is not existed.'}, status=status.HTTP_200_OK)
if parent.is_folder == False:
return Response({'message': 'parent is not folder.'}, status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
def get_permissions(self):
try:
# return permission_classes depending on `action`
return [permission() for permission in self.permission_classes_by_action[self.action]]
except KeyError:
# action is not set return default permission_classes
return [permission() for permission in self.permission_classes]
#url: items/{key}/children/
@action(methods=['GET', 'POST'], detail=True, permission_classes=[AllowAny],
url_path='children', url_name='children')
def children(self, request, pk, *args, **kwargs):
if request.method == 'GET':
parent_item = Item.objects.get(item_id = pk)
try:
parent_item = get_object_or_404(Item, pk = pk)
except parent_item.DoesNotExist:
return Response({'Error': 'Folder does not exist.'})
items = Item.objects.get(parent = parent_item.pk)
return Response(items, status=status.HTTP_200_OK)
if request.method == 'POST':
data = JSONParser().parse(request)
serializer = ItemSerializer(data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# url: /upload/
@action(methods=['POST'], detail=True, permission_classes=[AllowAny],
url_path='upload', url_name='upload')
def upload(self, request, pk):
if request.method == 'POST':
s3 = boto3.client('s3')
s3_bucket = AWS_STORAGE_BUCKET_NAME
#파일 객체 생성
file_name = request.POST.get('name', '')
file_size = request.POST.get('size', '')
file_parent = pk
file_type = mimetypes.guess_type(file_name)[0]
upload_item = Item(name=file_name, size=file_size, user_id=1, file_type=file_type, parent=file_parent)
upload_item.save()
date_long = datetime.utcnow().strftime('%Y%m%dT000000Z')
presigned_post = s3.generate_presigned_post(
s3_bucket,
file_name,
{
"acl": "private",
"Content-Type": file_type,
'region': AWS_REGION,
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
'x-amz-date': date_long
},
[
{"acl": "private"},
{"Content-Type": file_type},
{'x-amz-algorithm': 'AWS4-HMAC-SHA256'},
{'x-amz-date': date_long}
],
3600
)
data = {
"signed_url": presigned_post,
'url': 'https://%s.s3.amazonaws.com/%s' % (s3_bucket, file_name)
}
return Response({'presigned_post':presigned_post, 'proc_data':data}, status=status.HTTP_200_OK)
# url: /status/
@action(methods=['POST'], detail=True, permission_classes=[AllowAny],
url_path='status', url_name='status')
def status(self, request, *args, **kwargs):
if request.method == 'POST':
pk = request.POST.get('item_id', '')
queryset = Item.objects.filter(item_id = pk)
for cand in queryset:
cand.status = True
cand.save()
return Response({'Message': 'File Upload Successful'}, status=status.HTTP_200_OK)
return Response({'Error': 'No such item found in queryset'}, status=status.HTTP_400_BAD_REQUEST)
class SharedItemViewSet(viewsets.ModelViewSet):
queryset = SharedItem.objects.all()
# serializer_class = SharedItemSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly, permissions.AllowAny,
# IsOwnerOrReadOnly
]
# url: http://localhost:8000/items/1/share/
# 마지막 slash도 써주어야함
@csrf_exempt
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='share', url_name='share')
def share(self, request, pk):
if request.method == 'POST':
password = request.POST.get('password', '')
expires = request.POST.get('expires', '')
sharedfile = get_object_or_None(SharedItem, item_id=pk)
if sharedfile != None:
# 서버는 정상이나 이미 공유객체로 등록된 파일임
return Response({'message': 'This file is already shared'}, status=status.HTTP_200_OK)
sharedfile = SharedItem(item_id =pk, password=password, expires = expires)
sharedfile.save()
sharedfile = SharedItem.objects.get(item_id = pk)
# sf = serializers.serialize("json", sharedfile)
item = Item.objects.filter(item_id = pk)
item_json = serializers.serialize("json", item)
json_data = json.loads(item_json)
print(json_data)
res = json_data[0]['fields']
res['id'] = json_data[0]['pk']
return Response({"shared": sharedfile.created_time , 'data': res}, status=status.HTTP_200_OK)
item = ItemViewSet.as_view({
'delete': 'destroy',
})
import mimetypes
import json
import os
from datetime import datetime, timedelta
import boto3
from botocore.client import Config
from django.core import serializers
from django.views.decorators.csrf import csrf_exempt
from rest_framework import viewsets
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated, AllowAny
from .models import Item, SharedItem, User
from .serializers import UserSerializer, GroupSerializer, ItemSerializer
from rest_framework import status
from annoying.functions import get_object_or_None
from django.conf import settings
import jwt
from django.http import HttpResponse, JsonResponse
from khudrive.settings import AWS_SESSION_TOKEN, AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID, AWS_REGION, \
AWS_STORAGE_BUCKET_NAME, AWS_ENDPOINT_URL
class UserViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows users to be viewed or edited.
"""
queryset = User.objects.all().order_by('-date_joined')
serializer_class = UserSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly, permissions.AllowAny,
# IsOwnerOrReadOnly
]
permission_classes_by_action = {'get': [permissions.AllowAny],
'destroy': [permissions.AllowAny]}
@csrf_exempt
@action(detail=False, methods=['POST'], permission_classes=[permissions.AllowAny], url_path='signup',
url_name='singup')
def signup(self, request):
user_id = request.POST.get('user_id', '')
name = request.POST.get('name', '')
password = request.POST.get('password', '')
user = get_object_or_None(User, user_id=user_id)
if user == None:
user = User(user_id=user_id, name=name, password=password, total_size=100000, current_size=0)
user.save()
root = Item(is_folder=True, name="root", file_type="folder", path="", user_id=user.int_id, size=0,
status=True)
root.save()
user.root_folder = root.item_id
user.save()
return Response({
'message': 'user created',
'int_id': user.int_id,
'user_id': user.user_id,
'name': user.name,
'root_folder': root.item_id,
'total_size': user.total_size,
'current_size': user.current_size,
'created_time': user.created_time
},
status=status.HTTP_200_OK,
)
else:
return Response({'message': 'user is already exist.'}, status=status.HTTP_204_NO_CONTENT)
@csrf_exempt
@action(methods=['post'], detail=False, permission_classes=[permissions.AllowAny],
url_path='login', url_name='login')
def login(self, request):
if not request.data:
return Response({'Error': "Please provide user_id/password"}, status=status.HTTP_400_BAD_REQUEST)
user_id = request.POST['user_id']
password = request.POST['password']
try:
user = User.objects.get(user_id=user_id, password=password)
except User.DoesNotExist:
return Response({'Error': "Invalid user_id/password"}, status=status.HTTP_400_BAD_REQUEST)
if user:
payload1 = {
'int_id': user.int_id,
'user_id': user.user_id,
'exp': datetime.utcnow() + timedelta(seconds=300)
}
payload2 = {
'int_id': user.int_id,
'user_id': user.user_id,
'exp': datetime.utcnow() + timedelta(days=5)
}
access = jwt.encode(payload1, settings.SECRET_KEY, algorithm='HS256').decode('utf-8')
refresh = jwt.encode(payload2, settings.SECRET_KEY, algorithm='HS256').decode('utf-8')
exp = jwt.decode(access, settings.SECRET_KEY, algorithm='HS256')['exp']
token = {'access': access,
'refresh': refresh,
'exp': exp,
'user': {
'int_id': user.int_id,
'user_id': user.user_id,
'name': user.name,
'total_size': user.total_size,
'current_size': user.current_size,
'root_folder': user.root_folder
}}
return JsonResponse(
token,
status=status.HTTP_200_OK,
)
else:
return JsonResponse(
{'Error': "Invalid credentials"},
status=status.HTTP_400_BAD_REQUEST,
)
return JsonResponse(status=status.HTTP_405_METHOD_NOT_ALLOWED)
def get(self, request, pk):
user = User.objects.filter(int_id=pk)
data = serializers.serialize("json", user)
json_data = json.loads(data)
res = json_data[0]['fields']
res['id'] = json_data[0]['pk']
return Response({'data': res}, status=status.HTTP_200_OK)
def get_permissions(self):
try:
# return permission_classes depending on `action`
return [permission() for permission in self.permission_classes_by_action[self.action]]
except KeyError:
# action is not set return default permission_classes
return [permission() for permission in self.permission_classes]
class ItemViewSet(viewsets.ViewSet):
queryset = Item.objects.all()
serializer_class = ItemSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly, permissions.AllowAny,
# IsOwnerOrReadOnly
]
permission_classes_by_action = {'get': [permissions.AllowAny],
'destroy': [permissions.AllowAny]}
# url: items/search
@action(methods=['GET'], detail=False, permission_classes=[AllowAny], url_path='search', url_name='search')
def search(self, request):
if request.method == 'GET':
keyword = request.GET.get('keyword', '')
# user_id = request.GET.get('user_id', '')
item_list = Item.objects.filter(name__icontains=keyword)
data = serializers.serialize("json", item_list)
json_data = json.loads(data)
res = []
for i in json_data:
t = i['fields']
t['id'] = i['pk']
res.append(t)
return Response({'data': {'list': res}}, status=status.HTTP_200_OK)
"""
# url: items/11/
# 마지막 slash도 써주어야함
def get(self, request, pk):
#print(pk)
s3 = boto3.client('s3')
s3_bucket = AWS_STORAGE_BUCKET_NAME
#파일 객체 생성
object_name = request.GET.get('name', '')
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': s3_bucket,
'Key': object_name},
ExpiresIn = 3600
)
return Response({'message': presigned_url}, status=status.HTTP_200_OK)
"""
# url: items/11/
# 마지막 slash도 써주어야함
def get(self, request, pk):
s3 = boto3.client(
's3',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
endpoint_url=AWS_ENDPOINT_URL or None,
config=Config(s3={'addressing_style': 'path'})
)
s3_bucket = AWS_STORAGE_BUCKET_NAME
item = Item.objects.filter(item_id=pk)
object_id = item.get().item_id
data = serializers.serialize("json", item)
json_data = json.loads(data)
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': s3_bucket,
'Key': object_id},
ExpiresIn=3600
)
res = json_data[0]['fields']
res['id'] = json_data[0]['pk']
res['signed_url'] = presigned_url
return Response({'data': res}, status=status.HTTP_200_OK)
# url: items/11/
# 마지막 slash도 써주어야함
def destroy(self, request, pk):
if request.method == 'DELETE':
item = get_object_or_None(Item, item_id=pk)
if item != None:
if item.is_folder == True: # 폴더는 삭제 안되도록 처리
return Response({'message': 'This item is folder.'}, status=status.HTTP_200_OK)
item.is_deleted = True
item.save()
# item.delete() 이거 하면 완전 삭제되어버림 is deleted True 면 휴지통에서 리스트 조회할 수 있도록!
return Response({'message': 'destroy complete'}, status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='restore', url_name='restore')
def restore(self, request, pk):
if request.method == 'POST':
item = get_object_or_None(Item, item_id=pk)
if item != None:
item.is_deleted = False
item.save()
return Response({'message': 'restore complete'}, status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
@action(methods=['DELETE'], detail=True, permission_classes=[AllowAny], url_path='delete', url_name='delete')
def delete(self, request, pk):
if request.method == 'DELETE':
item = get_object_or_None(Item, item_id=pk)
if item != None:
if item.is_folder == True: # 폴더는 삭제 안되도록 처리
return Response({'message': 'This item is folder.'}, status=status.HTTP_200_OK)
item.delete()
return Response({'message': 'delete permanently complete'}, status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
# url: items/11/move
# 마지막 slash도 써주어야함
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='move', url_name='move')
def move(self, request, pk):
if request.method == 'POST':
parent_id = request.POST.get('parent', '')
name = request.POST.get('name','')
child = get_object_or_None(Item, item_id=pk)
if child == None:
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
if parent_id != '':
parent = get_object_or_None(Item, item_id=parent_id)
if parent == None:
return Response({'message': 'parent is not existed.'}, status=status.HTTP_200_OK)
if parent.is_folder == False:
return Response({'message': 'parent is not folder.'}, status=status.HTTP_200_OK)
if parent != None and parent.is_folder == True:
child.parent = parent_id
else:
parent_id = child.parent
if name != '':
child.name = name;
child.save()
child = Item.objects.filter(item_id = pk)
child_data = serializers.serialize("json", child)
json_child = json.loads(child_data)
res = json_child[0]['fields']
res['id'] = pk
parent = Item.objects.filter(item_id = parent_id)
parent_data = serializers.serialize("json", parent)
json_parent = json.loads(parent_data)[0]['fields']
res['parentInfo'] = json_parent
return Response({'data': res}, status=status.HTTP_200_OK)
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='copy', url_name='copy')
def copy(self, request, pk):
if request.method == 'POST':
parent_id = request.POST.get('parent', '')
parent = get_object_or_None(Item, item_id=parent_id)
if parent != None and parent.is_folder == True:
child = get_object_or_None(Item, item_id=pk)
if child == None:
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
if child.is_folder == True:
return Response({'message': 'item is folder'}, status=status.HTTP_204_NO_CONTENT)
copiedName = child.name + "_복사본_" + str(datetime.now().strftime('%Y-%m-%d %H:%M'))
copiedItem = Item(is_folder=False, name=copiedName, path=child.path, parent=parent_id,
user_id=child.user_id, size=child.size, status=child.status)
copiedItem.save()
copiedItem = Item.objects.filter(name=copiedName)
copied_data = serializers.serialize("json", copiedItem)
json_data = json.loads(copied_data)
res = json_data[0]['fields']
res['id'] = json_data[0]['pk']
parent = Item.objects.filter(item_id=parent_id)
parent_data = serializers.serialize("json", parent)
json_parent = json.loads(parent_data)[0]['fields']
res['parentInfo'] = json_parent
return Response({'data': res}, status=status.HTTP_200_OK)
if parent == None:
return Response({'message': 'parent is not existed.'}, status=status.HTTP_200_OK)
if parent.is_folder == False:
return Response({'message': 'parent is not folder.'}, status=status.HTTP_200_OK)
return Response({'message': 'item is not existed.'}, status=status.HTTP_204_NO_CONTENT)
def get_permissions(self):
try:
# return permission_classes depending on `action`
return [permission() for permission in self.permission_classes_by_action[self.action]]
except KeyError:
# action is not set return default permission_classes
return [permission() for permission in self.permission_classes]
# url: items/{key}/children/
@action(methods=['GET', 'POST'], detail=True, permission_classes=[AllowAny],
url_path='children', url_name='children')
def children(self, request, pk):
if request.method == 'GET':
children = Item.objects.filter(parent=pk, is_deleted=False, status=True)
children_data = serializers.serialize("json", children)
json_children = json.loads(children_data)
parent = Item.objects.filter(item_id=pk) # item
parent_data = serializers.serialize("json", parent)
json_parent = json.loads(parent_data)[0]['fields']
res = json_parent
res['id'] = pk
children_list = []
for i in json_children:
t = i['fields']
t['id'] = i['pk']
children_list.append(t)
res['list'] = children_list
return Response({'data': res}, status=status.HTTP_200_OK)
if request.method == 'POST':
name = request.POST.get('name', '')
user_id = request.GET.get('user_id', '')
item = Item(is_folder=True, name=name, file_type="folder", path="", parent=pk, user_id=user_id, size=0,
status=True)
item.save()
item = Item.objects.filter(item_id=item.item_id)
item_data = serializers.serialize("json", item)
json_item = json.loads(item_data)
res = json_item[0]['fields']
res['id'] = json_item[0]['pk']
res['inside_folder_list'] = []
res['inside_file_list'] = []
return Response({'data': res}, status=status.HTTP_200_OK)
@action(methods=['GET'], detail=False, permission_classes=[AllowAny],
url_path='trash', url_name='trash')
def trash(self, request):
if request.method == 'GET':
children = Item.objects.filter(is_deleted = True)
children_data = serializers.serialize("json", children)
json_children = json.loads(children_data)
res = {}
children_list = []
for i in json_children:
t = i['fields']
t['id'] = i['pk']
children_list.append(t)
res['list'] = children_list
return Response({'data': res}, status=status.HTTP_200_OK)
# url: /upload/
@action(methods=['POST'], detail=True, permission_classes=[AllowAny],
url_path='upload', url_name='upload')
def upload(self, request, pk):
if request.method == 'POST':
s3 = boto3.client(
's3',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
endpoint_url=AWS_ENDPOINT_URL or None,
config=Config(s3={'addressing_style': 'path'})
)
s3_bucket = AWS_STORAGE_BUCKET_NAME
# 파일 객체 생성
file_name = request.POST.get('name', '')
file_size = request.POST.get('size', '')
file_parent = pk
file_type = mimetypes.guess_type(file_name)[0]
upload_item = Item(name=file_name, size=file_size, user_id=1, file_type=file_type, parent=file_parent)
upload_item.save()
date_long = datetime.utcnow().strftime('%Y%m%dT000000Z')
presigned_post = s3.generate_presigned_post(
s3_bucket,
file_id,
{
"acl": "private",
"Content-Type": file_type,
"Content-Disposition": "attachment",
'region': AWS_REGION,
'x-amz-algorithm': 'AWS4-HMAC-SHA256',
'x-amz-date': date_long
},
[
{"acl": "private"},
{"Content-Type": file_type},
{"Content-Disposition": "attachment"},
{'x-amz-algorithm': 'AWS4-HMAC-SHA256'},
{'x-amz-date': date_long}
],
3600
)
item = Item.objects.filter(item_id=upload_item.item_id)
item_data = serializers.serialize("json", item)
json_item = json.loads(item_data)
res = json_item[0]['fields']
res['id'] = json_item[0]['pk']
data = {
"signed_url": presigned_post,
'url': '%s/%s' % (presigned_post["url"], file_id),
'item': res
}
return Response(data, status=status.HTTP_200_OK)
# url: /status/
@action(methods=['POST'], detail=True, permission_classes=[AllowAny],
url_path='status', url_name='status')
def status(self, request, *args, **kwargs):
if request.method == 'POST':
pk = request.POST.get('item_id', '')
queryset = Item.objects.filter(item_id=pk)
for cand in queryset:
cand.status = True
cand.save()
return Response({'Message': 'File Upload Successful'}, status=status.HTTP_200_OK)
return Response({'Error': 'No such item found in queryset'}, status=status.HTTP_400_BAD_REQUEST)
class SharedItemViewSet(viewsets.ModelViewSet):
queryset = SharedItem.objects.all()
# serializer_class = SharedItemSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly, permissions.AllowAny,
# IsOwnerOrReadOnly
]
# url: http://localhost:8000/items/1/share/
# 마지막 slash도 써주어야함
@csrf_exempt
@action(methods=['POST'], detail=True, permission_classes=[AllowAny], url_path='share', url_name='share')
def share(self, request, pk):
if request.method == 'POST':
password = request.POST.get('password', '')
expires = request.POST.get('expires', '')
sharedfile = get_object_or_None(SharedItem, item_id=pk)
if sharedfile != None:
# 서버는 정상이나 이미 공유객체로 등록된 파일임
return Response({'message': 'This file is already shared'}, status=status.HTTP_200_OK)
sharedfile = SharedItem(item_id=pk, password=password, expires=expires)
sharedfile.save()
sharedfile = SharedItem.objects.get(item_id=pk)
# sf = serializers.serialize("json", sharedfile)
item = Item.objects.filter(item_id=pk)
item_json = serializers.serialize("json", item)
json_data = json.loads(item_json)
print(json_data)
res = json_data[0]['fields']
res['id'] = json_data[0]['pk']
return Response({"shared": sharedfile.created_time, 'data': res}, status=status.HTTP_200_OK)
item = ItemViewSet.as_view({
'delete': 'destroy',
})
......