import hashlib
import hmac
import base64
class s3client:
def __init__(self, restful_server, access_key, secret_key):
self.restful_server = restful_server
self.access_key = access_key
self.secret_key = secret_key
# Note1: here http_headers is a directory like: http_headers["header-value"] = [value1, value2]
# Note2: the canonical_path may need urlencoded(here we don't process it)
def sign(self, method, canonical_path, parameters, http_headers):
if self.secret_key == "":
return # no auth secret; skip signing, e.g. for public read-only buckets
md5 = ""
ctype = ""
xamzdate = False
date = ""
xamz_elems = []
s3ParamsToSign = {"uploadId":True, "partNumber":True}
for k in http_headers.keys():
v = http_headers.get(k)
k = k.lower()
if k == "content-md5":
md5 = v[0]
elif k == "content-type":
ctype = v[0]
elif k == "date":
if xamzdate == False:
date = v[0]
else:
if k.startswith("x-amz-",0,len(k)):
str = k + ":"
for i, val in enumerate(v):
str += val
if i != len(v) -1:
str += ","
xamz_elems.append(str)
if k == "x-amz-date":
xamzdate = True
date = ""
xamz = ""
if len(xamz_elems) > 0:
xamz_elems.sort()
# print xamz_elems
for elem in xamz_elems:
xamz += (elem + "\n")
# print xamz
expires = False
if "Expires" in parameters:
# Query string request authentication alternative
expires = True
date = parameters["Expires"]
parameters["AWSAccessKeyId"] = self.access_key
# process the parameters
signParams = []
for k, v in parameters.items():
if k in s3ParamsToSign and s3ParamsToSign[k] == True:
if v == "":
signParams.append(k)
else:
signParams.append(k + "=" + v)
#print signParams
if len(signParams):
signParams.sort()
canonical_path = canonical_path + "?"
for i, v in enumerate(signParams):
canonical_path += v
if i != len(signParams) -1:
canonical_path += "&"
print(canonical_path)
payload = method + "\n"
payload += md5 + "\n"
payload += ctype + "\n"
payload += date + "\n"
payload += xamz
payload += canonical_path
print("payload: %s" %payload)
#byte_sk = bytearray(self.secret_key, "utf-8")
#hmac_sha = hmac.new(byte_sk, digestmod=hashlib.sha1)
hmac_sha = hmac.new(self.secret_key.encode("utf8"), digestmod=hashlib.sha1)
hmac_sha.update(payload.encode("utf8"))
signature = base64.b64encode(hmac_sha.digest())
print("signature: %s" % signature)
if expires:
parameters["Signature"] = signature
#xqIFY7lyjhiWs0rJ+gx6/Amjo6c=
else:
str = "AWS " + self.access_key + ":" + signature.decode()
http_headers["Authorization"] = [str] #["AWS " + self.access_key + ":" + signature]
#AWS B0KTM947XL7ZG0X36RQ2:TjXdqQa8LykH0uq4KmJXXx8ywPg=
# -*- coding:utf-8 -*-
import s3_client
import hashlib
import datetime
import base64
import urllib
def testSign():
UPLOAD_SERVER = "web.com:443"
access_key = "xxxxx"
secret_key = "xxxx"
client = s3_client.s3client(UPLOAD_SERVER, access_key, secret_key)
# 1) method
method = "GET"
# 2) canonical_path
canonical_path = "/bucket/md5/filename"
# 3) parameters
parameters = {}
#uploadId = "2~QymFaxK5DMs8XkzBSoIEhW - 7 - 1 - TBRm"
#parameters["uploadId"] = "2~QymFaxK5DMs8XkzBSoIEhW-7-1-TBRm"
#parameters["partNumber"] = "1"
parameters["AWSAccessKeyId"] = "96X9UA862J8CHV28Y9UU"
parameters["Expires"] = "1551668789"
#parameters["Signature"] = "1551668789"
str = "hello,world"
# 4) http_headers
http_headers = {}
http_headers["Content-Type"] = ["application/octet-stream"]
#http_headers["Content-Length"] = [len(str)]
hash_md5 = hashlib.md5(str.encode("utf8"))
digest = hash_md5.digest()
#digest = "46ff8464449b5f71b0b6496ceb0b8bf0"
#md5b64 = base64.standard_b64encode(digest)
#print(md5b64)
md5b64 = "NDQ5YjVmNzFiMGI2NDk2Yw=="
#http_headers["Content-MD5"] = [md5b64]
GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
#http_headers["x-amz-date"] = [datetime.datetime.utcnow().strftime(GMT_FORMAT)]
#http_headers["x-amz-date"] = ["Mon, 04 Mar 2019 02:36:27 GMT"]
#http_headers["X-Amz-User-Agent"] = ["aws-sdk-js/2.3.19"]
#print("x-amz-date: %s" % http_headers["x-amz-date"])
#http_headers["x-amz-meta-reviewdby"] = ["jjjj@johnsmith.net", "jjjj@johnsmith.net"]
client.sign(method, canonical_path, parameters, http_headers)
print("canonical_path: %s" % canonical_path)
print("parameters: %s" % parameters)
print("http_headers", http_headers)
testSign(