Conversation
plugins/sinks/gcs/client.go
Outdated
| "google.golang.org/api/option" | ||
| ) | ||
|
|
||
| type GCSClient interface { |
plugins/sinks/gcs/README.md
Outdated
| @@ -0,0 +1,41 @@ | |||
| # GCS | |||
|
|
|||
| Sinks json data to a file in `ndjson` format in a Google Cloud Storage bucket | |||
There was a problem hiding this comment.
Maybe we can rephrase this as:
Sinks json data to a file as ndjson format in Google Cloud Storage bucket
plugins/sinks/gcs/client.go
Outdated
| writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx) | ||
|
|
||
| return &gcsClient{ | ||
| client: client, |
There was a problem hiding this comment.
I don't see client getting used anywhere, should we get rid of it?
There was a problem hiding this comment.
yes, its's only used to create the writer, will update it
plugins/sinks/gcs/client.go
Outdated
| writer *storage.Writer | ||
| } | ||
|
|
||
| func newGCSClient(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (GCSClient, error) { |
There was a problem hiding this comment.
You should return *gcsClient instead of interface.
Remember always return struct and accept interfaces in args
plugins/sinks/gcs/client.go
Outdated
|
|
||
| func (c *gcsClient) WriteData(jsonBytes []byte) error { | ||
| if _, err := c.writer.Write(jsonBytes); err != nil { | ||
| return errors.Wrap(err, "error in writing json data to an object") |
There was a problem hiding this comment.
Why client should know if the data is json or something else? It's just raw bytes for it. So this error won't be applicable if you decide to write simple text.
plugins/sinks/gcs/client.go
Outdated
| func (c *gcsClient) Close() error { | ||
| if err := c.writer.Close(); err != nil { | ||
| return errors.Wrap(err, "error closing the writer") | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Maybe simplify this as
func (c *gcsClient) Close() error {
return c.writer.Close()
}
plugins/sinks/gcs/gcs.go
Outdated
| func (s *Sink) resolveBucketandObjectNames() (string, string) { | ||
| dirs := strings.Split(s.config.Path, "/") | ||
| bucketname := dirs[0] | ||
| timestamp := time.Now().Format("2006.01.02 15:04:05") |
There was a problem hiding this comment.
I would suggest using a pre-defined time RFC format, specially the one without space.
There was a problem hiding this comment.
will RFC3339 do ? Output format is "2006-01-02T15:04:05Z07:00"
plugins/sinks/gcs/gcs.go
Outdated
| if s.config.ObjectPrefix != "" { | ||
| s.config.ObjectPrefix = s.config.ObjectPrefix + "-" | ||
| } |
There was a problem hiding this comment.
What if user has provided prefix as hello-, you will add double hyphens?
There was a problem hiding this comment.
It should be a single hyphen only. I will add a check for that
plugins/sinks/gcs/gcs.go
Outdated
| s.config.ObjectPrefix = s.config.ObjectPrefix + "-" | ||
| } | ||
|
|
||
| objectname := s.config.ObjectPrefix + timestamp + ".ndjson" |
There was a problem hiding this comment.
use fmt.Sprintf to build strings.
plugins/sinks/gcs/gcs.go
Outdated
|
|
||
| objectname := s.config.ObjectPrefix + timestamp + ".ndjson" | ||
| if len(dirs) > 1 { | ||
| objectname = dirs[len(dirs)-1] + "/" + s.config.ObjectPrefix + timestamp + ".ndjson" |
There was a problem hiding this comment.
use fmt.Sprintf to build strings.
plugins/sinks/gcs/gcs.go
Outdated
| } | ||
|
|
||
| func (s *Sink) validateServiceAccountKey() error { | ||
|
|
plugins/sinks/gcs/gcs.go
Outdated
| if err := s.client.Close(); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } |
plugins/sinks/gcs/gcs.go
Outdated
| return nil | ||
| } | ||
|
|
||
| func (s *Sink) resolveBucketandObjectNames() (string, string) { |
There was a problem hiding this comment.
A better name could be resolveBucketPath
plugins/sinks/gcs/gcs.go
Outdated
| dirs := strings.Split(s.config.Path, "/") | ||
| bucketname := dirs[0] |
There was a problem hiding this comment.
A notation pretty common in object storage is gcs://bucketname or s3://bucketname I guess this line would fail here. Can we use url.Parse(...)?
plugins/sinks/gcs/gcs.go
Outdated
| objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp) | ||
|
|
||
| if len(dirs) > 1 { | ||
| objectname = fmt.Sprintf("%s/%s%s.ndjson", dirs[len(dirs)-1], objectprefix, timestamp) |
There was a problem hiding this comment.
How will dirs[len(dirs)-1] work for a path like gcs://bucketname/path1/path2/
There was a problem hiding this comment.
The default path format I assumed was the one I got from GCS bucket/folder copy path options. If that's the required case, will handle the gcs prefix
plugins/sinks/gcs/gcs.go
Outdated
| return nil | ||
| } | ||
|
|
||
| func (s *Sink) Close() (err error) { |
There was a problem hiding this comment.
(err error) could be just error
|
@kushsharma done with the changes, taking the input as a URL in the sink config, now URL format it accepts in config is |
|
LGTM. Nice work. |
feat: add GCS sink (raystack#469)
No description provided.