-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple tables support (destination) #120
base: main
Are you sure you want to change the base?
Conversation
65d878e
to
39c14e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some notes about the changes.
pr, pw := io.Pipe() | ||
w := gzip.NewWriter(pw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compression now happens in memory while uploading the file.
) | ||
|
||
// DataType represents a Snowflake data type. | ||
type DataType interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataType was added as a way to represent the Snowflake data type.
"context" | ||
"strings" | ||
// Table represents a Snowflake table. | ||
type Table struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table
is now used everywhere where we need the schema of a table. It collects the info about the table name, its schema (columns, primary keys) and exposes a way to get "connector columns" (operation, created at, updated at, deleted at).
UpdateBatch | ||
) | ||
|
||
type Batch struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct was introduced to collect everything needed for a batch of records (i.e. a single CSV file). It can be created either using NewInsertBatch
or NewUpdateBatch
and depending on the function used it will produce a different merge query and a different filename.
return bytes.NewBuffer(nil) | ||
}, | ||
}, | ||
tableCache: make(map[string]snowflake.Table), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Snowflake tables and their schema is now cached in the writer. We only retrieve it the first time. The assumption is that the table schema won't be modified externally, only through the connector.
col2 := cols2[k] | ||
if !strings.EqualFold(col1.Name, col2.Name) { | ||
return fmt.Errorf("column %d doesn't match (%s:%T != %s:%T)", k, col1.Name, col1.DataType, col2.Name, col2.DataType) | ||
} | ||
// TODO check data type? what if the source record has nil values and we don't know types? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit shady - we compare the schema (columns) we extracted from the first record with the snowflake table schema. The problem is if some fields are nullable and/or the record is partially populated, because we expect the exact same number of columns and the same order of columns.
I'm pretty sure the ordering is not guaranteed right now, as a schema fetched from the record will have connector columns in front (operation
, created_at
etc.), while the columns in a snowflake table schema will be simply ordered alphabetically. Some work is needed to make sure we have consistent ordering.
} | ||
|
||
var insertBatchTemplate = template.Must( | ||
template.New("insertBatch"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten these merge queries to use Go templates, IMO it's easier to read, as you can see what gets inserted in which place.
@@ -12,6 +12,8 @@ | |||
// See the License for the specific language governing permissions and | |||
// limitations under the License. | |||
|
|||
//go:generate mockgen -typed -destination=mock/iterator.go -package=mock . Iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created go:generate
statements and removed make mockgen
.
have string | ||
want DataType | ||
}{{ | ||
have: `{"type":"FIXED","precision":38,"scale":0,"nullable":true}`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test cases are taken from the Snowflake documentation page, except the last couple of cases, which are not described in docs. For those I have created a test table in snowflake and checked the output of SHOW COLUMNS
.
sdk "github.com/conduitio/conduit-connector-sdk" | ||
"github.com/go-errors/errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the files I've touched I removed github.com/go-errors/errors
in favor of errors
. Now that the builtin package provides Join
, Is
and As
I don't see a good reason to use an external dependency for this.
"encoding/json" | ||
"fmt" | ||
|
||
"github.com/lovromazgon/jsonpoly" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I extracted the code that handled polymorphic JSON types into a library, as I thought it could be generally useful (github.com/lovromazgon/jsonpoly. In commit 2c8ff92 I replaced the code in the connector in favor of using the lib.
Description
TBD
Quick checks: